diff --git a/examples/README.md b/examples/README.md index 73e779e..4aca661 100644 --- a/examples/README.md +++ b/examples/README.md @@ -2,11 +2,6 @@ This directory contains examples of some common use cases of gstreamer using the go bindings. -The common package provided to each example exports two methods. - - - `Run(f)` - This wraps the given function in a goroutine and wraps a GMainLoop around it. - - `RunLoop(f(loop))` - This simply creates (but does not start) a GMainLoop and passes it to the example to manage. - Each example can be run in one of two ways: ```bash @@ -16,4 +11,6 @@ go run /main.go [..args] # For multiple-file examples (but would also work for single file examples) cd && go build . ./ [..args] -``` \ No newline at end of file +``` + +See the plugins subdirectory to learn how to write custom elements in `go-gst` \ No newline at end of file diff --git a/examples/decodebin/main.go b/examples/decodebin/main.go index f29d113..766bd15 100644 --- a/examples/decodebin/main.go +++ b/examples/decodebin/main.go @@ -50,13 +50,9 @@ func buildPipeline() (*gst.Pipeline, error) { pipeline := gst.NewPipeline("") - src, ok := gst.ElementFactoryMake("filesrc", "").(*gst.Element) + src := gst.ElementFactoryMake("filesrc", "") - if !ok { - return nil, fmt.Errorf("could not create filesource") - } - - decodebin, ok := gst.ElementFactoryMake("decodebin", "").(*gst.Bin) + decodebin, ok := gst.ElementFactoryMake("decodebin", "").(*gst.Bin) // must cast since we need a weak reference if !ok { return nil, fmt.Errorf("could not create decodebin") } diff --git a/examples/discoverer/main.go b/examples/discoverer/main.go index 8028568..008ee49 100644 --- a/examples/discoverer/main.go +++ b/examples/discoverer/main.go @@ -15,13 +15,13 @@ import ( "os" "time" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/pbutils" + "github.com/go-gst/go-gst/pkg/gst" + "github.com/go-gst/go-gst/pkg/gstpbutils" ) func main() { - gst.Init(nil) + gst.Init() if len(os.Args) < 2 { fmt.Printf("USAGE: %s \n", os.Args[0]) @@ -30,7 +30,7 @@ func main() { uri := os.Args[1] - discoverer, err := pbutils.NewDiscoverer(gst.ClockTime(time.Second * 15)) + discoverer, err := gstpbutils.NewDiscoverer(gst.ClockTime(time.Second * 15)) if err != nil { fmt.Println("ERROR:", err) os.Exit(2) @@ -45,23 +45,23 @@ func main() { printDiscovererInfo(info) } -func printDiscovererInfo(info *pbutils.DiscovererInfo) { - fmt.Println("URI:", info.GetURI()) - fmt.Println("Duration:", info.GetDuration()) +func printDiscovererInfo(info *gstpbutils.DiscovererInfo) { + fmt.Println("URI:", info.URI()) + fmt.Println("Duration:", info.Duration()) printTags(info) - printStreamInfo(info.GetStreamInfo()) + printStreamInfo(info.StreamInfo()) - children := info.GetStreamList() + children := info.StreamList() fmt.Println("Children streams:") for _, child := range children { printStreamInfo(child) } } -func printTags(info *pbutils.DiscovererInfo) { +func printTags(info *gstpbutils.DiscovererInfo) { fmt.Println("Tags:") - tags := info.GetTags() + tags := info.Tags() if tags != nil { fmt.Println(" ", tags) return @@ -69,13 +69,13 @@ func printTags(info *pbutils.DiscovererInfo) { fmt.Println(" no tags") } -func printStreamInfo(info *pbutils.DiscovererStreamInfo) { +func printStreamInfo(info *gstpbutils.DiscovererStreamInfo) { if info == nil { return } fmt.Println("Stream: ") - fmt.Println(" Stream id:", info.GetStreamID()) - if caps := info.GetCaps(); caps != nil { + fmt.Println(" Stream id:", info.StreamID()) + if caps := info.Caps(); caps != nil { fmt.Println(" Format:", caps) } } diff --git a/examples/pad-probes/main.go b/examples/pad-probes/main.go index 10ff891..50d6ce3 100644 --- a/examples/pad-probes/main.go +++ b/examples/pad-probes/main.go @@ -12,49 +12,39 @@ package main import ( "errors" "fmt" - "math" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples" - "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/pkg/gst" ) -func padProbes(mainLoop *glib.MainLoop) error { - gst.Init(nil) +func main() { + gst.Init() // Parse the pipeline we want to probe from a static in-line string. // Here we give our audiotestsrc a name, so we can retrieve that element // from the resulting pipeline. - pipeline, err := gst.NewPipelineFromString( + ret, err := gst.ParseLaunch( "audiotestsrc name=src ! audio/x-raw,format=S16LE,channels=1 ! fakesink", ) if err != nil { - return err + panic("could not create pipeline") } + pipeline := ret.(*gst.Pipeline) + // Get the audiotestsrc element from the pipeline that GStreamer // created for us while parsing the launch syntax above. - // - // TODO: There are some discrepancies still between methods that check the nil - // value and return an error, versus those that will instead just return nil. - // Need to settle on one way or the other. - src, err := pipeline.GetElementByName("src") - if err != nil { - return err - } + src := pipeline.ByName("src").(*gst.Element) + // Get the audiotestsrc's src-pad. - srcPad := src.GetStaticPad("src") - if srcPad == nil { - return errors.New("src pad on src element was nil") - } + srcPad := src.StaticPad("src") // Add a probe handler on the audiotestsrc's src-pad. // This handler gets called for every buffer that passes the pad we probe. srcPad.AddProbe(gst.PadProbeTypeBuffer, func(self *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { // Interpret the data sent over the pad as a buffer. We know to expect this because of // the probe mask defined above. - buffer := info.GetBuffer() + buffer := info.Buffer() // At this point, buffer is only a reference to an existing memory region somewhere. // When we want to access its content, we have to map it while requesting the required @@ -63,24 +53,31 @@ func padProbes(mainLoop *glib.MainLoop) error { // on the machine's main memory itself, but rather in the GPU's memory. // So mapping the buffer makes the underlying memory region accessible to us. // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html - mapInfo := buffer.Map(gst.MapRead) - defer buffer.Unmap() + mapInfo, ok := buffer.Map(gst.MapRead) + + if !ok { + panic("could not map buffer") + } + + defer buffer.Unmap(mapInfo) + + // TODO: make mapInfo data accessible // We know what format the data in the memory region has, since we requested // it by setting the fakesink's caps. So what we do here is interpret the - // memory region we mapped as an array of signed 16 bit integers. - samples := mapInfo.AsInt16LESlice() - if len(samples) == 0 { - return gst.PadProbeOK - } + // // memory region we mapped as an array of signed 16 bit integers. + // samples := mapInfo + // if len(samples) == 0 { + // return gst.PadProbeOK + // } - // For each buffer (= chunk of samples) calculate the root mean square. - var square float64 - for _, i := range samples { - square += float64(i * i) - } - rms := math.Sqrt(square / float64(len(samples))) - fmt.Println("rms:", rms) + // // For each buffer (= chunk of samples) calculate the root mean square. + // var square float64 + // for _, i := range samples { + // square += float64(i * i) + // } + // rms := math.Sqrt(square / float64(len(samples))) + // fmt.Println("rms:", rms) return gst.PadProbeOK }) @@ -90,29 +87,25 @@ func padProbes(mainLoop *glib.MainLoop) error { // Block on messages coming in from the bus instead of using the main loop for { - msg := pipeline.GetPipelineBus().TimedPop(gst.ClockTimeNone) + msg := pipeline.Bus().TimedPop(gst.ClockTimeNone) if msg == nil { break } if err := handleMessage(msg); err != nil { - return err + + fmt.Println(err) + return } } - - return nil } func handleMessage(msg *gst.Message) error { - defer msg.Unref() switch msg.Type() { - case gst.MessageEOS: + case gst.MessageEos: return errors.New("end-of-stream") case gst.MessageError: - return msg.ParseError() + err, _ := msg.ParseError() + return err } return nil } - -func main() { - examples.RunLoop(padProbes) -} diff --git a/examples/playbin/main.go b/examples/playbin/main.go index 231b2f8..88980f6 100644 --- a/examples/playbin/main.go +++ b/examples/playbin/main.go @@ -16,40 +16,46 @@ import ( "fmt" "os" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples" - "github.com/go-gst/go-gst/gst" + "github.com/diamondburned/gotk4/pkg/glib/v2" + "github.com/go-gst/go-gst/pkg/gst" ) -func playbin(mainLoop *glib.MainLoop) error { +func playbin() error { + gst.Init() + + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) + if len(os.Args) < 2 { return errors.New("usage: playbin ") } - gst.Init(nil) + gst.Init() // Create a new playbin and set the URI on it - playbin, err := gst.NewElement("playbin") - if err != nil { - return err + ret := gst.ElementFactoryMake("playbin", "") + if ret != nil { + return fmt.Errorf("could not create playbin") } - playbin.Set("uri", os.Args[1]) + + playbin := ret.(*gst.Pipeline) + + playbin.SetObjectProperty("uri", os.Args[1]) // The playbin element itself is a pipeline, so it can be used as one, despite being // created from an element factory. - bus := playbin.GetBus() + bus := playbin.Bus() playbin.SetState(gst.StatePlaying) - bus.AddWatch(func(msg *gst.Message) bool { + bus.AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool { switch msg.Type() { - case gst.MessageEOS: + case gst.MessageEos: mainLoop.Quit() return false case gst.MessageError: - err := msg.ParseError() + err, debug := msg.ParseError() fmt.Println("ERROR:", err.Error()) - if debug := err.DebugString(); debug != "" { + if debug != "" { fmt.Println("DEBUG") fmt.Println(debug) } @@ -57,33 +63,36 @@ func playbin(mainLoop *glib.MainLoop) error { return false // Watch state change events case gst.MessageStateChanged: - if _, newState := msg.ParseStateChanged(); newState == gst.StatePlaying { - bin := gst.ToGstBin(playbin) + if _, newState, _ := msg.ParseStateChanged(); newState == gst.StatePlaying { // Generate a dot graph of the pipeline to GST_DEBUG_DUMP_DOT_DIR if defined - bin.DebugBinToDotFile(gst.DebugGraphShowAll, "PLAYING") + gst.DebugBinToDotFile(&playbin.Bin, gst.DebugGraphShowAll, "PLAYING") } // Tag messages contain changes to tags on the stream. This can include metadata about // the stream such as codecs, artists, albums, etc. case gst.MessageTag: - tags := msg.ParseTags() + tags := msg.ParseTag() fmt.Println("Tags:") - if artist, ok := tags.GetString(gst.TagArtist); ok { + if artist, ok := tags.String(gst.TAG_ARTIST); ok { fmt.Println(" Artist:", artist) } - if album, ok := tags.GetString(gst.TagAlbum); ok { + if album, ok := tags.String(gst.TAG_ALBUM); ok { fmt.Println(" Album:", album) } - if title, ok := tags.GetString(gst.TagTitle); ok { + if title, ok := tags.String(gst.TAG_TITLE); ok { fmt.Println(" Title:", title) } } return true }) - return mainLoop.RunError() + mainLoop.Run() + + return nil } func main() { - examples.RunLoop(playbin) + if err := playbin(); err != nil { + fmt.Println(err) + } } diff --git a/examples/plugins/Makefile b/examples/plugins/Makefile deleted file mode 100644 index bbeafe0..0000000 --- a/examples/plugins/Makefile +++ /dev/null @@ -1,13 +0,0 @@ -PLUGINS ?= $(patsubst %/,%,$(sort $(dir $(wildcard */)))) - -all: $(PLUGINS) - -.PHONY: $(PLUGINS) -$(PLUGINS): - cd $@ && \ - go generate && \ - go build -o ../libgst$@.so -buildmode c-shared . - rm libgst$@.h - -clean: - rm -f *.so *.h \ No newline at end of file diff --git a/examples/plugins/README.md b/examples/plugins/README.md index 4ce022f..eed4b69 100644 --- a/examples/plugins/README.md +++ b/examples/plugins/README.md @@ -1,8 +1,3 @@ # Plugins -This directory contains examples of writing GStreamer plugins using `go-gst`. -The metadata required by GStreamer is generated via `go generate` with the code for the generator contained in this repo -at [`cmd/gst-plugin-gen`](../../cmd/gst-plugin-gen). - -The generator assumes the above is compiled and accessible in your PATH as `gst-plugin-gen`. -You can build and install it to your `GOPATH` by running `make install-plugin-gen` in the root of the repository. \ No newline at end of file +This directory contains examples of writing GStreamer plugins using `go-gst`. \ No newline at end of file diff --git a/examples/plugins/async-identity/asyncidentity.go b/examples/plugins/async-identity/asyncidentity.go deleted file mode 100644 index 74f0ef5..0000000 --- a/examples/plugins/async-identity/asyncidentity.go +++ /dev/null @@ -1,232 +0,0 @@ -//lint:file-ignore U1000 Ignore all unused code, this is example code - -// +plugin:Name=async-identity -// +plugin:Description=A go-gst example plugin with async state changes -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/go-gst/go-gst -// +plugin:ReleaseDate=2024-09-13 -// -// +element:Name=asyncidentity -// +element:Rank=gst.RankNone -// +element:Impl=asyncidentity -// +element:Subclass=gst.ExtendsElement -// -//go:generate gst-plugin-gen -package main - -import ( - "fmt" - "sync/atomic" - "time" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" -) - -var ( - _cat = gst.NewDebugCategory( - "asyncidentity", - gst.DebugColorNone, - "asyncidentity element", - ) - - _srcPadTemplate = gst.NewPadTemplate("generic-src", gst.PadDirectionSource, - gst.PadPresenceAlways, gst.NewAnyCaps()) - _sinkPadTemplate = gst.NewPadTemplate("generic-sink", gst.PadDirectionSink, - gst.PadPresenceAlways, gst.NewAnyCaps()) - _properties = []*glib.ParamSpec{ - glib.NewUint64Param( - "delay", - "ns state change delay", - "Duration in nanoseconds to wait until a state changes", - _delayNsMin, _delayNsMax, _delayNsDefault, - glib.ParameterReadWrite, - ), - } -) - -const ( - _propDelayNs = 0 - - _delayNsMin = uint64(0) - _delayNsMax = uint64(time.Second) * 10 - _delayNsDefault = uint64(time.Second) -) - -func main() {} - -type asyncidentity struct { - // inner state - sinkpad *gst.Pad - srcpad *gst.Pad - - asyncPending atomic.Bool - - // property storage - delayNs atomic.Uint64 -} - -var _ glib.GoObjectSubclass = (*asyncidentity)(nil) - -func (g *asyncidentity) New() glib.GoObjectSubclass { return &asyncidentity{} } - -func (g *asyncidentity) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - class.SetMetadata( - "Async Identity Example", - "General", - "An async state changing identity like element", - "Artem Martus ", - ) - - class.AddStaticPadTemplate(_srcPadTemplate) - class.AddStaticPadTemplate(_sinkPadTemplate) - - class.InstallProperties(_properties) -} - -var _ glib.GoObject = (*asyncidentity)(nil) - -func (g *asyncidentity) SetProperty(obj *glib.Object, id uint, value *glib.Value) { - self := gst.ToElement(obj) - - switch id { - case _propDelayNs: - newDelayErased, err := value.GoValue() - if err != nil { - self.Error("Failed unmarshalling the 'delay' property", err) - return - } - newDelay, ok := newDelayErased.(uint64) - if !ok { - self.Error("Failed Go-casting the 'delay' interface{} into uint64", - fmt.Errorf("interfaced value: %+v", newDelayErased)) - return - } - oldDelay := g.delayNs.Swap(newDelay) - - self.Log(_cat, gst.LevelInfo, - fmt.Sprintf("Changed delay property %s => %s", - time.Duration(oldDelay), - time.Duration(newDelay), - )) - default: - self.Error("Tried to set unknown property", - fmt.Errorf("prop id %d: %s", id, value.TypeName())) - } -} - -func (g *asyncidentity) GetProperty(obj *glib.Object, id uint) *glib.Value { - var ( - out *glib.Value - err error - ) - - switch id { - case _propDelayNs: - out, err = glib.GValue(g.delayNs.Load()) - default: - err = fmt.Errorf("unknown property id: %d", id) - } - - if err != nil { - self := gst.ToElement(obj) - self.Error("Get property error", err) - out = nil - } - - return out -} - -func (g *asyncidentity) Constructed(self *glib.Object) { - elem := gst.ToElement(self) - srcPad := gst.NewPadFromTemplate(_srcPadTemplate, "src") - sinkPad := gst.NewPadFromTemplate(_sinkPadTemplate, "sink") - - sinkPad.SetChainFunction(g.sink_chain_function) - - // Have to set proxy flags on a pads - proxyFlags := gst.PadFlagProxyAllocation | gst.PadFlagProxyCaps | gst.PadFlagProxyScheduling - sinkPad.SetFlags(proxyFlags) - srcPad.SetFlags(proxyFlags) - - // Or setup query & event functions like so - - // sinkPad.SetQueryFunction(func(self *gst.Pad, parent *gst.Object, query *gst.Query) bool { - // return srcPad.PeerQuery(query) - // }) - // sinkPad.SetEventFunction(func(self *gst.Pad, parent *gst.Object, event *gst.Event) bool { - // return srcPad.PushEvent(event) - // }) - - // srcPad.SetQueryFunction(func(self *gst.Pad, parent *gst.Object, query *gst.Query) bool { - // return sinkPad.PeerQuery(query) - // }) - // srcPad.SetEventFunction(func(self *gst.Pad, parent *gst.Object, event *gst.Event) bool { - // return sinkPad.PushEvent(event) - // }) - - elem.AddPad(srcPad) - elem.AddPad(sinkPad) - - g.srcpad = srcPad - g.sinkpad = sinkPad - - g.delayNs.Store(_delayNsDefault) -} - -func (g *asyncidentity) sink_chain_function( - _self *gst.Pad, - _parent *gst.Object, - buffer *gst.Buffer, -) gst.FlowReturn { - - return g.srcpad.Push(buffer) -} - -// var _ gst.ElementImpl = (*asyncidentity)(nil) - -func (g *asyncidentity) ChangeState(el *gst.Element, transition gst.StateChange) gst.StateChangeReturn { - if ret := el.ParentChangeState(transition); ret == gst.StateChangeFailure { - return ret - } - - switch transition { - case gst.StateChangeNullToReady: - // async will be ignored due to target state <= READY - case gst.StateChangeReadyToPaused: - // async will be ignored due to no_preroll - return gst.StateChangeNoPreroll - case gst.StateChangePausedToPlaying: - fallthrough - case gst.StateChangePlayingToPaused: - g.asyncStateChange(el) - return gst.StateChangeAsync - case gst.StateChangePausedToReady: - // async will be ignored due to target state <= READY - case gst.StateChangeReadyToNull: - } - - // check against forcing state change - if g.asyncPending.Load() { - return gst.StateChangeAsync - } - - return gst.StateChangeSuccess -} - -func (g *asyncidentity) asyncStateChange(el *gst.Element) { - msg := gst.NewAsyncStartMessage(el) - _ = el.PostMessage(msg) - go func(el *gst.Element) { - g.asyncPending.Store(true) - delay := time.Duration(g.delayNs.Load()) - <-time.After(delay) - msg := gst.NewAsyncDoneMessage(el, gst.ClockTimeNone) - _ = el.PostMessage(msg) - g.asyncPending.Store(false) - }(el) -} diff --git a/examples/plugins/basetransform/internal/customtransform/element.go b/examples/plugins/basetransform/internal/customtransform/element.go index 8821ac6..fd768c8 100644 --- a/examples/plugins/basetransform/internal/customtransform/element.go +++ b/examples/plugins/basetransform/internal/customtransform/element.go @@ -1,68 +1,9 @@ package customtransform import ( - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" + "github.com/go-gst/go-gst/pkg/gstbase" ) -type customBaseTransform struct{} - -// ClassInit is the place where you define pads and properties -func (*customBaseTransform) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - class.SetMetadata( - "custom base transform", - "Transform/demo", - "custom base transform", - "Wilhelm Bartel ", - ) - class.AddPadTemplate(gst.NewPadTemplate( - "src", - gst.PadDirectionSource, - gst.PadPresenceAlways, - gst.NewCapsFromString("audio/x-raw,channels=2,rate=48000"), - )) - class.AddPadTemplate(gst.NewPadTemplate( - "sink", - gst.PadDirectionSink, - gst.PadPresenceAlways, - gst.NewCapsFromString("audio/x-raw,channels=2,rate=48000"), - )) -} - -// SetProperty gets called for every property. The id is the index in the slice defined above. -func (s *customBaseTransform) SetProperty(self *glib.Object, id uint, value *glib.Value) {} - -// GetProperty is called to retrieve the value of the property at index `id` in the properties -// slice provided at ClassInit. -func (o *customBaseTransform) GetProperty(self *glib.Object, id uint) *glib.Value { - return nil -} - -// New is called by the bindings to create a new instance of your go element. Use this to initialize channels, maps, etc. -// -// Think of New like the constructor of your struct -func (*customBaseTransform) New() glib.GoObjectSubclass { - return &customBaseTransform{} -} - -// InstanceInit should initialize the element. Keep in mind that the properties are not yet present. When this is called. -func (s *customBaseTransform) InstanceInit(instance *glib.Object) {} - -func (s *customBaseTransform) Constructed(o *glib.Object) {} - -func (s *customBaseTransform) Finalize(o *glib.Object) {} - -// see base.GstBaseTransformImpl interface for the method signatures of the virtual methods -// -// it is not required to implement all methods -var _ base.GstBaseTransformImpl = nil - -func (s *customBaseTransform) SinkEvent(self *base.GstBaseTransform, event *gst.Event) bool { - return self.ParentSinkEvent(event) -} - -func (s *customBaseTransform) SrcEvent(self *base.GstBaseTransform, event *gst.Event) bool { - return self.ParentSrcEvent(event) +type customBaseTransform struct { + gstbase.BaseTransform } diff --git a/examples/plugins/basetransform/internal/customtransform/register.go b/examples/plugins/basetransform/internal/customtransform/register.go index 980b198..8a15bf4 100644 --- a/examples/plugins/basetransform/internal/customtransform/register.go +++ b/examples/plugins/basetransform/internal/customtransform/register.go @@ -1,23 +1,51 @@ package customtransform import ( - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" + "github.com/diamondburned/gotk4/pkg/core/glib" + "github.com/go-gst/go-gst/pkg/gst" + "github.com/go-gst/go-gst/pkg/gstbase" ) // Register needs to be called after gst.Init() to make the gocustombin available in the standard // gst element registry. After this call the element can be used like any other gstreamer element func Register() bool { - return gst.RegisterElement( + registered := glib.RegisterSubclassWithConstructor[*customBaseTransform]( + func() *customBaseTransform { + return &customBaseTransform{} + }, + glib.WithOverrides[*customBaseTransform, gstbase.BaseTransformOverrides](func(b *customBaseTransform) gstbase.BaseTransformOverrides { + return gstbase.BaseTransformOverrides{} + }), + glib.WithClassInit[*gstbase.BaseTransformClass](func(class *gstbase.BaseTransformClass) { + class.ParentClass().SetStaticMetadata( + "custom base transform", + "Transform/demo", + "custom base transform", + "Wilhelm Bartel ", + ) + + class.ParentClass().AddPadTemplate(gst.NewPadTemplate( + "src", + gst.PadSrc, + gst.PadAlways, + gst.CapsFromString("audio/x-raw,channels=2,rate=48000"), + )) + class.ParentClass().AddPadTemplate(gst.NewPadTemplate( + "sink", + gst.PadSink, + gst.PadAlways, + gst.CapsFromString("audio/x-raw,channels=2,rate=48000"), + )) + }), + ) + + return gst.ElementRegister( // no plugin: nil, // The name of the element "gocustomtransform", // The rank of the element - gst.RankNone, - // The GoElement implementation for the element - &customBaseTransform{}, - // The base subclass this element extends - base.ExtendsBaseTransform, + uint(gst.RankNone), + registered.Type(), ) } diff --git a/examples/plugins/basetransform/main.go b/examples/plugins/basetransform/main.go index 892dec3..0cc0703 100644 --- a/examples/plugins/basetransform/main.go +++ b/examples/plugins/basetransform/main.go @@ -5,32 +5,33 @@ import ( "fmt" "os" "os/signal" + "time" "github.com/go-gst/go-gst/examples/plugins/basetransform/internal/customtransform" - "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/pkg/gst" ) func run(ctx context.Context) error { ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) defer cancel() - gst.Init(nil) + gst.Init() customtransform.Register() - pipeline, err := gst.NewPipelineFromString("audiotestsrc ! gocustomtransform ! fakesink") + ret, err := gst.ParseLaunch("audiotestsrc ! gocustomtransform ! fakesink") if err != nil { return err } + pipeline := ret.(*gst.Pipeline) + pipeline.SetState(gst.StatePlaying) <-ctx.Done() - pipeline.BlockSetState(gst.StateNull) - - gst.Deinit() + pipeline.BlockSetState(gst.StateNull, gst.ClockTime(time.Second)) return ctx.Err() } diff --git a/examples/plugins/boilerplate/boilerplate.go b/examples/plugins/boilerplate/boilerplate.go deleted file mode 100644 index b37bee7..0000000 --- a/examples/plugins/boilerplate/boilerplate.go +++ /dev/null @@ -1,40 +0,0 @@ -//lint:file-ignore U1000 Ignore all unused code, this is example code - -// +plugin:Name=boilerplate -// +plugin:Description=My plugin written in go -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/go-gst/go-gst -// +plugin:ReleaseDate=2021-01-18 -// -// +element:Name=myelement -// +element:Rank=gst.RankNone -// +element:Impl=myelement -// +element:Subclass=gst.ExtendsElement -// -//go:generate gst-plugin-gen -package main - -import ( - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" -) - -func main() {} - -type myelement struct{} - -func (g *myelement) New() glib.GoObjectSubclass { return &myelement{} } - -func (g *myelement) ClassInit(klass *glib.ObjectClass) { - // Set the plugin's longname as it is a basic requirement for a GStreamer plugin - class := gst.ToElementClass(klass) - class.SetMetadata( - "Boilerplate", - "General", - "An empty element which does nothing", - "Avi Zimmerman ", - ) -} diff --git a/examples/plugins/gobin/gobin.go b/examples/plugins/gobin/gobin.go deleted file mode 100644 index b6faaf0..0000000 --- a/examples/plugins/gobin/gobin.go +++ /dev/null @@ -1,41 +0,0 @@ -//lint:file-ignore U1000 Ignore all unused code, this is a work in progress - -// +plugin:Name=gobin -// +plugin:Description=A bin element written in go -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/go-gst/go-gst -// +plugin:ReleaseDate=2021-01-18 -// -// +element:Name=gobin -// +element:Rank=gst.RankNone -// +element:Impl=gobin -// +element:Subclass=gst.ExtendsBin -// +element:Interfaces=gst.InterfaceChildProxy -// -//go:generate gst-plugin-gen -package main - -import ( - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" -) - -func main() {} - -type gobin struct{} - -func (g *gobin) New() glib.GoObjectSubclass { return &gobin{} } - -func (g *gobin) ClassInit(klass *glib.ObjectClass) { - // Set the plugin's longname as it is a basic requirement for a GStreamer plugin - class := gst.ToElementClass(klass) - class.SetMetadata( - "GoBin example", - "General", - "An empty GstBin element which does nothing", - "Avi Zimmerman ", - ) -} diff --git a/examples/plugins/gofilesink/filesink.go b/examples/plugins/gofilesink/filesink.go deleted file mode 100644 index ff43e92..0000000 --- a/examples/plugins/gofilesink/filesink.go +++ /dev/null @@ -1,285 +0,0 @@ -// This example demonstrates a filesink plugin implemented in Go. -// -// Every element in a Gstreamer pipeline is provided by plugins. Some are builtin while -// others are provided by third-parties or distributed privately. The plugins are built -// around the GObject type system. -// -// Go-gst offers loose bindings around the GObject type system to provide the necessary -// functionality to implement these plugins. The example in this code produces an element -// that can write to a file on the local system. -// -// In order to build the plugin for use by GStreamer, you can do the following: -// -// $ go generate -// $ go build -o libgstgofilesink.so -buildmode c-shared . -// -// +plugin:Name=gofilesink -// +plugin:Description=File plugins written in go -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/go-gst/go-gst -// +plugin:ReleaseDate=2021-01-04 -// -// +element:Name=gofilesink -// +element:Rank=gst.RankNone -// +element:Impl=FileSink -// +element:Subclass=base.ExtendsBaseSink -// +element:Interfaces=gst.InterfaceURIHandler -// -//go:generate gst-plugin-gen -package main - -import ( - "errors" - "fmt" - "io" - "os" - "strings" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" -) - -// main is left unimplemented since these files are compiled to c-shared. -func main() {} - -// CAT is the log category for the gofilesink. It is safe to define GStreamer objects as globals -// without calling gst.Init, since in the context of a loaded plugin all initialization has -// already been taken care of by the loading application. -var CAT = gst.NewDebugCategory( - "gofilesink", - gst.DebugColorNone, - "GoFileSink Element", -) - -// Here we define a list of ParamSpecs that will make up the properties for our element. -// This element only has a single property, the location of the file to write to. -// When getting and setting properties later on, you will reference them by their index in -// this list. -var properties = []*glib.ParamSpec{ - glib.NewStringParam( - "location", // The name of the parameter - "File Location", // The long name for the parameter - "Location to write the file to", // A blurb about the parameter - nil, // A default value for the parameter - glib.ParameterReadWrite, // Flags for the parameter - ), -} - -// Here we declare a private struct to hold our internal state. -type state struct { - // Whether the element is started or not - started bool - // The file the element is writing to - file *os.File - // The current position in the file - position uint64 -} - -// This is another private struct where we hold the parameter values set on our -// element. -type settings struct { - location string -} - -// Finally a structure is defined that implements (at a minimum) the glib.GoObject interface. -// It is possible to signal to the bindings to inherit from other classes or implement other -// interfaces via the registration and TypeInit processes. -type FileSink struct { - // The settings for the element - settings *settings - // The current state of the element - state *state -} - -// setLocation is a simple method to check the validity of a provided file path and set the -// local value with it. -func (f *FileSink) setLocation(path string) error { - if f.state.started { - return errors.New("changing the `location` property on a started `GoFileSink` is not supported") - } - f.settings.location = strings.TrimPrefix(path, "file://") // should obviously use url.URL and do actual parsing - return nil -} - -// The ObjectSubclass implementations below are for registering the various aspects of our -// element and its capabilities with the type system. These are the minimum methods that -// should be implemented by an element. - -// Every element needs to provide its own constructor that returns an initialized glib.GoObjectSubclass -// implementation. Here we simply create a new fileSink with zeroed settings and state objects. -func (f *FileSink) New() glib.GoObjectSubclass { - CAT.Log(gst.LevelLog, "Initializing new fileSink object") - return &FileSink{ - settings: &settings{}, - state: &state{}, - } -} - -// The ClassInit method should specify the metadata for this element and add any pad templates -// and properties. -func (f *FileSink) ClassInit(klass *glib.ObjectClass) { - CAT.Log(gst.LevelLog, "Initializing gofilesink class") - class := gst.ToElementClass(klass) - class.SetMetadata( - "File Sink", - "Sink/File", - "Write stream to a file", - "Avi Zimmerman ", - ) - CAT.Log(gst.LevelLog, "Adding sink pad template and properties to class") - class.AddPadTemplate(gst.NewPadTemplate( - "sink", - gst.PadDirectionSink, - gst.PadPresenceAlways, - gst.NewAnyCaps(), - )) - class.InstallProperties(properties) -} - -// Object implementations are used during the initialization of an element. The -// methods are called once the object is constructed and its properties are read -// and written to. These and the rest of the methods described below are documented -// in interfaces in the bindings, however only individual methods needs from those -// interfaces need to be implemented. When left unimplemented, the behavior of the parent -// class is inherited. - -// SetProperty is called when a `value` is set to the property at index `id` in the -// properties slice that we installed during ClassInit. It should attempt to register -// the value locally or signal any errors that occur in the process. -func (f *FileSink) SetProperty(self *glib.Object, id uint, value *glib.Value) { - param := properties[id] - switch param.Name() { - case "location": - var val string - if value == nil { - val = "" - } else { - val, _ = value.GetString() - } - if err := f.setLocation(val); err != nil { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, - fmt.Sprintf("Could not set location on object: %s", err.Error()), - "", - ) - return - } - gst.ToElement(self).Log(CAT, gst.LevelInfo, fmt.Sprintf("Set `location` to %s", f.settings.location)) - } -} - -// GetProperty is called to retrieve the value of the property at index `id` in the properties -// slice provided at ClassInit. -func (f *FileSink) GetProperty(self *glib.Object, id uint) *glib.Value { - param := properties[id] - switch param.Name() { - case "location": - if f.settings.location == "" { - return nil - } - val, err := glib.GValue(f.settings.location) - if err == nil { - return val - } - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - fmt.Sprintf("Could not convert %s to GValue", f.settings.location), - err.Error(), - ) - } - return nil -} - -// GstBaseSink implementations are optional methods to implement from the base.GstBaseSinkImpl interface. -// If the method is not overridden by the implementing struct, it will be inherited from the parent class. - -// Start is called to start the filesink. Open the file for writing and set the internal state. -func (f *FileSink) Start(self *base.GstBaseSink) bool { - if f.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSink is already started", "") - return false - } - - if f.settings.location == "" { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "No location configured on the filesink", "") - return false - } - - destFile := f.settings.location - - var err error - f.state.file, err = os.Create(destFile) - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenWrite, - fmt.Sprintf("Could not open %s for writing", destFile), err.Error()) - return false - } - - self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Opened file %s for writing", destFile)) - - f.state.started = true - self.Log(CAT, gst.LevelInfo, "GoFileSink has started") - return true -} - -// Stop is called to stop the element. Set the internal state and close the file. -func (f *FileSink) Stop(self *base.GstBaseSink) bool { - if !f.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSink is not started", "") - return false - } - - if err := f.state.file.Close(); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Failed to close the destination file", err.Error()) - return false - } - - self.Log(CAT, gst.LevelInfo, "GoFileSink has stopped") - return true -} - -// Render is called when a buffer is ready to be written to the file. -func (f *FileSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn { - if !f.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSink is not started", "") - return gst.FlowError - } - - self.Log(CAT, gst.LevelTrace, fmt.Sprintf("Rendering buffer at %v", buffer.Instance())) - newPos, err := io.Copy(f.state.file, buffer.Reader()) - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Error copying buffer to file", err.Error()) - return gst.FlowError - } - - f.state.position += uint64(newPos) - self.Log(CAT, gst.LevelTrace, fmt.Sprintf("New position in file: %v", f.state.position)) - - return gst.FlowOK -} - -// URIHandler implementations are the methods required by the GstURIHandler interface. - -// GetURI returns the currently configured URI -func (f *FileSink) GetURI() string { return fmt.Sprintf("file://%s", f.settings.location) } - -// GetURIType returns the types of URI this element supports. -func (f *FileSink) GetURIType() gst.URIType { return gst.URISource } - -// GetProtocols returns the protcols this element supports. -func (f *FileSink) GetProtocols() []string { return []string{"file"} } - -// SetURI should set the URI that this element is working on. -func (f *FileSink) SetURI(uri string) (bool, error) { - if uri == "file://" { - return true, nil - } - err := f.setLocation(uri) - if err != nil { - return false, err - } - CAT.Log(gst.LevelInfo, fmt.Sprintf("Set `location` to %s via URIHandler", f.settings.location)) - return true, nil -} diff --git a/examples/plugins/gofilesrc/filesrc.go b/examples/plugins/gofilesrc/filesrc.go deleted file mode 100644 index 09d7161..0000000 --- a/examples/plugins/gofilesrc/filesrc.go +++ /dev/null @@ -1,354 +0,0 @@ -// This example demonstrates a filesrc plugin implemented in Go. -// -// Every element in a Gstreamer pipeline is provided by plugins. Some are builtin while -// others are provided by third-parties or distributed privately. The plugins are built -// around the GObject type system. -// -// Go-gst offers loose bindings around the GObject type system to provide the necessary -// functionality to implement these plugins. The example in this code produces an element -// that can read from a file on the local system. -// -// In order to build the plugin for use by GStreamer, you can do the following: -// -// $ go generate -// $ go build -o libgstgofilesrc.so -buildmode c-shared . -// -// +plugin:Name=gofilesrc -// +plugin:Description=File plugins written in go -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/go-gst/go-gst -// +plugin:ReleaseDate=2021-01-04 -// -// +element:Name=gofilesrc -// +element:Rank=gst.RankNone -// +element:Impl=FileSrc -// +element:Subclass=base.ExtendsBaseSrc -// +element:Interfaces=gst.InterfaceURIHandler -// -//go:generate gst-plugin-gen -package main - -import ( - "errors" - "fmt" - "io" - "os" - "strings" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" -) - -// main is left unimplemented since these files are compiled to c-shared. -func main() {} - -// CAT is the log category for the gofilesrc. It is safe to define GStreamer objects as globals -// without calling gst.Init, since in the context of a loaded plugin all initialization has -// already been taken care of by the loading application. -var CAT = gst.NewDebugCategory( - "gofilesrc", - gst.DebugColorNone, - "GoFileSrc Element", -) - -// Here we define a list of ParamSpecs that will make up the properties for our element. -// This element only has a single property, the location of the file to read from. -// When getting and setting properties later on, you will reference them by their index in -// this list. -var properties = []*glib.ParamSpec{ - glib.NewStringParam( - "location", // The name of the parameter - "File Location", // The long name for the parameter - "Location of the file to read from", // A blurb about the parameter - nil, // A default value for the parameter - glib.ParameterReadWrite, // Flags for the parameter - ), -} - -// Here we declare a private struct to hold our internal state. -type state struct { - // Whether the element is started or not - started bool - // The file the element is reading from - file *os.File - // The information about the file retrieved from stat - fileInfo os.FileInfo - // The current position in the file - position uint64 -} - -// This is another private struct where we hold the parameter values set on our -// element. -type settings struct { - location string -} - -// Finally a structure is defined that implements (at a minimum) the gst.GoElement interface. -// It is possible to signal to the bindings to inherit from other classes or implement other -// interfaces via the registration and TypeInit processes. -type FileSrc struct { - // The settings for the element - settings *settings - // The current state of the element - state *state -} - -// Private methods only used internally by the plugin - -// setLocation is a simple method to check the validity of a provided file path and set the -// local value with it. -func (f *FileSrc) setLocation(path string) error { - if f.state.started { - return errors.New("changing the `location` property on a started `GoFileSrc` is not supported") - } - f.settings.location = strings.TrimPrefix(path, "file://") // should obviously use url.URL and do actual parsing - return nil -} - -// The ObjectSubclass implementations below are for registering the various aspects of our -// element and its capabilities with the type system. These are the minimum methods that -// should be implemented by an element. - -// Every element needs to provide its own constructor that returns an initialized -// glib.GoObjectSubclass and state objects. -func (f *FileSrc) New() glib.GoObjectSubclass { - CAT.Log(gst.LevelLog, "Initializing new fileSrc object") - return &FileSrc{ - settings: &settings{}, - state: &state{}, - } -} - -// The ClassInit method should specify the metadata for this element and add any pad templates -// and properties. -func (f *FileSrc) ClassInit(klass *glib.ObjectClass) { - CAT.Log(gst.LevelLog, "Initializing gofilesrc class") - class := gst.ToElementClass(klass) - class.SetMetadata( - "File Source", - "Source/File", - "Read stream from a file", - "Avi Zimmerman ", - ) - CAT.Log(gst.LevelLog, "Adding src pad template and properties to class") - class.AddPadTemplate(gst.NewPadTemplate( - "src", - gst.PadDirectionSource, - gst.PadPresenceAlways, - gst.NewAnyCaps(), - )) - class.InstallProperties(properties) -} - -// Object implementations are used during the initialization of an element. The -// methods are called once the object is constructed and its properties are read -// and written to. These and the rest of the methods described below are documented -// in interfaces in the bindings, however only individual methods needs from those -// interfaces need to be implemented. When left unimplemented, the behavior of the parent -// class is inherited. - -// SetProperty is called when a `value` is set to the property at index `id` in the -// properties slice that we installed during ClassInit. It should attempt to register -// the value locally or signal any errors that occur in the process. -func (f *FileSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) { - param := properties[id] - switch param.Name() { - case "location": - var val string - if value == nil { - val = "" - } else { - val, _ = value.GetString() - } - if err := f.setLocation(val); err != nil { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, - fmt.Sprintf("Could not set location on object: %s", err.Error()), - "", - ) - return - } - gst.ToElement(self).Log(CAT, gst.LevelInfo, fmt.Sprintf("Set `location` to %s", f.settings.location)) - } -} - -// GetProperty is called to retrieve the value of the property at index `id` in the properties -// slice provided at ClassInit. -func (f *FileSrc) GetProperty(self *glib.Object, id uint) *glib.Value { - param := properties[id] - switch param.Name() { - case "location": - if f.settings.location == "" { - return nil - } - val, err := glib.GValue(f.settings.location) - if err == nil { - return val - } - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - fmt.Sprintf("Could not convert %s to GValue", f.settings.location), - err.Error(), - ) - } - return nil -} - -// Constructed is called when the type system is done constructing the object. Any finalizations required -// during the initialization process can be performed here. In this example, we set the format on our -// underlying GstBaseSrc to bytes. -func (f *FileSrc) Constructed(self *glib.Object) { - base.ToGstBaseSrc(self).Log(CAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes") - base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes) -} - -// GstBaseSrc implementations are optional methods to implement from the base.GstBaseSrcImpl interface. -// If the method is not overridden by the implementing struct, it will be inherited from the parent class. - -// IsSeekable returns that we are, in fact, seekable. -func (f *FileSrc) IsSeekable(*base.GstBaseSrc) bool { return true } - -// GetSize will return the total size of the file at the configured location. -func (f *FileSrc) GetSize(self *base.GstBaseSrc) (bool, int64) { - if !f.state.started { - return false, 0 - } - return true, f.state.fileInfo.Size() -} - -// Start is called to start this element. In this example, the configured file is opened for reading, -// and any error encountered in the process is posted to the pipeline. -func (f *FileSrc) Start(self *base.GstBaseSrc) bool { - if f.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "GoFileSrc is already started", "") - return false - } - - if f.settings.location == "" { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "File location is not defined", "") - return false - } - - stat, err := os.Stat(f.settings.location) - if err != nil { - if os.IsNotExist(err) { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, - fmt.Sprintf("%s does not exist", f.settings.location), "") - return false - } - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, - fmt.Sprintf("Could not stat %s, err: %s", f.settings.location, err.Error()), "") - return false - } - if stat.IsDir() { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, - fmt.Sprintf("%s is a directory", f.settings.location), "") - return false - } - f.state.fileInfo = stat - self.Log(CAT, gst.LevelDebug, fmt.Sprintf("file stat - name: %s size: %d mode: %v modtime: %v", stat.Name(), stat.Size(), stat.Mode(), stat.ModTime())) - - self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Opening file %s for reading", f.settings.location)) - f.state.file, err = os.Open(f.settings.location) - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, - fmt.Sprintf("Could not open file %s for reading", f.settings.location), err.Error()) - return false - } - - f.state.position = 0 - f.state.started = true - - self.StartComplete(gst.FlowOK) - - self.Log(CAT, gst.LevelInfo, "GoFileSrc has started") - return true -} - -// Stop is called to stop the element. The file is closed and the local values are zeroed out. -func (f *FileSrc) Stop(self *base.GstBaseSrc) bool { - if !f.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "FileSrc is not started", "") - return false - } - - if err := f.state.file.Close(); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, "Failed to close the source file", err.Error()) - return false - } - - f.state.file = nil - f.state.position = 0 - f.state.started = false - - self.Log(CAT, gst.LevelInfo, "GoFileSrc has stopped") - return true -} - -// Fill is called to fill a pre-allocated buffer with the data at offset up to the given size. -// Since we declared that we are seekable, we need to support the provided offset not necessarily matching -// where we currently are in the file. This is why we store the position in the file locally. -func (f *FileSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer *gst.Buffer) gst.FlowReturn { - if !f.state.started || f.state.file == nil { - self.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "Not started yet", "") - return gst.FlowError - } - - self.Log(CAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size)) - - if f.state.position != offset { - self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Seeking to new position at offset %v from previous position at offset %v", offset, f.state.position)) - if _, err := f.state.file.Seek(int64(offset), 0); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSeek, - fmt.Sprintf("Failed to seek to %d in file", offset), err.Error()) - return gst.FlowError - } - f.state.position = offset - } - - bufmap := buffer.Map(gst.MapWrite) - if bufmap == nil { - self.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to map buffer", "") - return gst.FlowError - } - defer buffer.Unmap() - - self.Log(CAT, gst.LevelLog, fmt.Sprintf("Reading %v bytes from offset %v in file into buffer at %v", size, f.state.position, bufmap.Data())) - if _, err := io.CopyN(bufmap.Writer(), f.state.file, int64(size)); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorRead, - fmt.Sprintf("Failed to read %d bytes from file at %d into buffer", size, offset), err.Error()) - return gst.FlowError - } - buffer.SetSize(int64(size)) - - f.state.position = f.state.position + uint64(size) - self.Log(CAT, gst.LevelLog, fmt.Sprintf("Incremented current position to %v", f.state.position)) - - return gst.FlowOK -} - -// URIHandler implementations are the methods required by the GstURIHandler interface. - -// GetURI returns the currently configured URI -func (f *FileSrc) GetURI() string { return fmt.Sprintf("file://%s", f.settings.location) } - -// GetURIType returns the types of URI this element supports. -func (f *FileSrc) GetURIType() gst.URIType { return gst.URISource } - -// GetProtocols returns the protcols this element supports. -func (f *FileSrc) GetProtocols() []string { return []string{"file"} } - -// SetURI should set the URI that this element is working on. -func (f *FileSrc) SetURI(uri string) (bool, error) { - if uri == "file://" { - return true, nil - } - err := f.setLocation(uri) - if err != nil { - return false, err - } - CAT.Log(gst.LevelInfo, fmt.Sprintf("Set `location` to %s via URIHandler", f.settings.location)) - return true, nil -} diff --git a/examples/plugins/minio/common.go b/examples/plugins/minio/common.go deleted file mode 100644 index 35400d6..0000000 --- a/examples/plugins/minio/common.go +++ /dev/null @@ -1,167 +0,0 @@ -package main - -import ( - "crypto/tls" - "crypto/x509" - "fmt" - "io/ioutil" - "net/http" - "os" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - minio "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" -) - -const ( - accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID" - secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY" -) - -var ( - defaultEndpoint = "play.min.io" - defaultUseTLS = true - defaultRegion = "us-east-1" - defaultInsecureSkipVerify = false -) - -type settings struct { - endpoint string - useTLS bool - region string - bucket string - key string - accessKeyID string - secretAccessKey string - insecureSkipVerify bool - caCertFile string - partSize uint64 -} - -func (s *settings) safestring() string { - return fmt.Sprintf("%+v", &settings{ - endpoint: s.endpoint, - useTLS: s.useTLS, - region: s.region, - bucket: s.bucket, - key: s.key, - insecureSkipVerify: s.insecureSkipVerify, - caCertFile: s.caCertFile, - }) -} - -func defaultSettings() *settings { - return &settings{ - endpoint: defaultEndpoint, - useTLS: defaultUseTLS, - region: defaultRegion, - accessKeyID: os.Getenv(accessKeyIDEnvVar), - secretAccessKey: os.Getenv(secretAccessKeyEnvVar), - insecureSkipVerify: defaultInsecureSkipVerify, - partSize: defaultPartSize, - } -} - -func getMinIOClient(settings *settings) (*minio.Client, error) { - transport := http.DefaultTransport.(*http.Transport).Clone() - - if settings.useTLS { - if transport.TLSClientConfig == nil { - transport.TLSClientConfig = &tls.Config{} - } - if settings.caCertFile != "" { - certPool := x509.NewCertPool() - body, err := ioutil.ReadFile(settings.caCertFile) - if err != nil { - return nil, err - } - certPool.AppendCertsFromPEM(body) - transport.TLSClientConfig.RootCAs = certPool - } - transport.TLSClientConfig.InsecureSkipVerify = settings.insecureSkipVerify - } - return minio.New(settings.endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(settings.accessKeyID, settings.secretAccessKey, ""), - Secure: settings.useTLS, - Region: settings.region, - }) -} - -func setProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *settings, id uint, value *glib.Value) { - prop := properties[id] - - val, err := value.GoValue() - if err != nil { - elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, - fmt.Sprintf("Could not coerce %v to go value", value), err.Error()) - } - - switch prop.Name() { - case "endpoint": - settings.endpoint = val.(string) - case "use-tls": - settings.useTLS = val.(bool) - case "tls-skip-verify": - settings.insecureSkipVerify = val.(bool) - case "ca-cert-file": - settings.caCertFile = val.(string) - case "region": - settings.region = val.(string) - case "bucket": - settings.bucket = val.(string) - case "key": - settings.key = val.(string) - case "access-key-id": - settings.accessKeyID = val.(string) - case "secret-access-key": - settings.secretAccessKey = val.(string) - case "part-size": - settings.partSize = val.(uint64) - } -} - -func getProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *settings, id uint) *glib.Value { - prop := properties[id] - - var localVal interface{} - - switch prop.Name() { - case "endpoint": - localVal = settings.endpoint - case "use-tls": - localVal = settings.useTLS - case "tls-skip-verify": - localVal = settings.insecureSkipVerify - case "ca-cert-file": - localVal = settings.caCertFile - case "region": - localVal = settings.region - case "bucket": - localVal = settings.bucket - case "key": - localVal = settings.key - case "access-key-id": - localVal = settings.accessKeyID - case "secret-access-key": - localVal = "" - case "part-size": - localVal = settings.partSize - - default: - elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, - fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "") - return nil - } - - val, err := glib.GValue(localVal) - if err != nil { - elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - fmt.Sprintf("Could not convert %v to GValue", localVal), - err.Error(), - ) - return nil - } - - return val -} diff --git a/examples/plugins/minio/go.mod b/examples/plugins/minio/go.mod deleted file mode 100644 index 9cbd242..0000000 --- a/examples/plugins/minio/go.mod +++ /dev/null @@ -1,29 +0,0 @@ -module github.com/go-gst/go-gst/examples/plugins/minio - -go 1.22 - -require ( - github.com/go-gst/go-glib v1.2.1 - github.com/go-gst/go-gst v1.2.1 - github.com/minio/minio-go/v7 v7.0.76 -) - -require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/go-ini/ini v1.67.0 // indirect - github.com/goccy/go-json v0.10.3 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect - github.com/klauspost/cpuid/v2 v2.2.8 // indirect - github.com/mattn/go-pointer v0.0.1 // indirect - github.com/minio/md5-simd v1.1.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rs/xid v1.6.0 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect - golang.org/x/net v0.29.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/text v0.21.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/examples/plugins/minio/go.sum b/examples/plugins/minio/go.sum deleted file mode 100644 index c53f6a6..0000000 --- a/examples/plugins/minio/go.sum +++ /dev/null @@ -1,45 +0,0 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/go-gst/go-glib v1.2.1 h1:ibAr5N1NmuHmZ5RaCFjFjeUy0Rk3t3LgvGutmwBeR9E= -github.com/go-gst/go-glib v1.2.1/go.mod h1:JybIYeoHNwCkHGaBf1fHNIaM4sQTrJPkPLsi7dmPNOU= -github.com/go-gst/go-gst v1.2.1 h1:FqUFGFllbuC8LkQoqULgAui2ZS0VU1WEBCNekIMcBEE= -github.com/go-gst/go-gst v1.2.1/go.mod h1:OGPRsJdvNYCKjt3e4H4i8J6KVd2Wk5S2lzsEQ8mO1+g= -github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= -github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= -github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= -github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= -github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= -github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= -github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= -github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= -github.com/minio/minio-go/v7 v7.0.76 h1:9nxHH2XDai61cT/EFhyIw/wW4vJfpPNvl7lSFpRt+Ng= -github.com/minio/minio-go/v7 v7.0.76/go.mod h1:AVM3IUN6WwKzmwBxVdjzhH8xq+f57JSbbvzqvUzR6eg= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= -github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/plugins/minio/miniosink.go b/examples/plugins/minio/miniosink.go deleted file mode 100644 index cb1369f..0000000 --- a/examples/plugins/minio/miniosink.go +++ /dev/null @@ -1,225 +0,0 @@ -package main - -import ( - "fmt" - "io" - "os" - "strings" - "sync" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" -) - -var sinkCAT = gst.NewDebugCategory( - "miniosink", - gst.DebugColorNone, - "MinIOSink Element", -) - -type minioSink struct { - settings *settings - state *sinkstate - - writer *seekWriter - mux sync.Mutex -} - -type sinkstate struct { - started bool -} - -func (m *minioSink) New() glib.GoObjectSubclass { - srcCAT.Log(gst.LevelLog, "Creating new minioSink object") - return &minioSink{ - settings: defaultSettings(), - state: &sinkstate{}, - } -} - -func (m *minioSink) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - sinkCAT.Log(gst.LevelLog, "Initializing miniosink class") - class.SetMetadata( - "MinIO Sink", - "Sink/File", - "Write stream to a MinIO object", - "Avi Zimmerman ", - ) - sinkCAT.Log(gst.LevelLog, "Adding sink pad template and properties to class") - class.AddPadTemplate(gst.NewPadTemplate( - "sink", - gst.PadDirectionSink, - gst.PadPresenceAlways, - gst.NewAnyCaps(), - )) - class.InstallProperties(sinkProperties) -} - -func (m *minioSink) Constructed(obj *glib.Object) { base.ToGstBaseSink(obj).SetSync(false) } - -func (m *minioSink) SetProperty(self *glib.Object, id uint, value *glib.Value) { - setProperty(gst.ToElement(self), sinkProperties, m.settings, id, value) -} - -func (m *minioSink) GetProperty(self *glib.Object, id uint) *glib.Value { - return getProperty(gst.ToElement(self), sinkProperties, m.settings, id) -} - -func (m *minioSink) Query(self *base.GstBaseSink, query *gst.Query) bool { - switch query.Type() { - - case gst.QuerySeeking: - self.Log(sinkCAT, gst.LevelDebug, "Answering seeking query") - query.SetSeeking(gst.FormatTime, true, 0, -1) - return true - - } - return false -} - -func (m *minioSink) Start(self *base.GstBaseSink) bool { - m.mux.Lock() - defer m.mux.Unlock() - - if m.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, - "MinIOSink is already started", "") - return false - } - - if m.settings.bucket == "" { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, - "No bucket configured on the miniosink", "") - return false - } - - if m.settings.key == "" { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, - "No bucket configured on the miniosink", "") - return false - } - - self.Log(sinkCAT, gst.LevelDebug, m.settings.safestring()) - - if strings.HasPrefix(m.settings.accessKeyID, "env:") { - spl := strings.Split(m.settings.accessKeyID, "env:") - m.settings.accessKeyID = os.Getenv(spl[len(spl)-1]) - } - - if strings.HasPrefix(m.settings.secretAccessKey, "env:") { - spl := strings.Split(m.settings.secretAccessKey, "env:") - m.settings.secretAccessKey = os.Getenv(spl[len(spl)-1]) - } - - self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Creating new MinIO client for %s", m.settings.endpoint)) - client, err := getMinIOClient(m.settings) - if err != nil { - self.Log(sinkCAT, gst.LevelError, err.Error()) - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, - fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error()) - return false - } - - self.Log(sinkCAT, gst.LevelInfo, "Initializing new MinIO writer") - m.writer = newSeekWriter(client, int64(m.settings.partSize), m.settings.bucket, m.settings.key) - - m.state.started = true - self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started") - return true -} - -func (m *minioSink) Stop(self *base.GstBaseSink) bool { - self.Log(sinkCAT, gst.LevelInfo, "Stopping MinIOSink") - m.mux.Lock() - defer m.mux.Unlock() - - if !m.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "") - return false - } - - m.writer = nil - m.state.started = false - - self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped") - return true -} - -func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn { - m.mux.Lock() - defer m.mux.Unlock() - - if !m.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "") - return gst.FlowError - } - - self.Log(sinkCAT, gst.LevelTrace, fmt.Sprintf("Rendering buffer %v", buffer)) - - if _, err := m.writer.Write(buffer.Bytes()); err != nil { - self.Log(sinkCAT, gst.LevelError, err.Error()) - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, fmt.Sprintf("Failed to write data to minio buffer: %s", err.Error()), "") - return gst.FlowError - } - - return gst.FlowOK -} - -func (m *minioSink) Event(self *base.GstBaseSink, event *gst.Event) bool { - - switch event.Type() { - - case gst.EventTypeSegment: - segment := event.ParseSegment() - - if segment.GetFormat() == gst.FormatBytes { - if uint64(m.writer.currentPosition) != segment.GetStart() { - m.mux.Lock() - self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Seeking to %d", segment.GetStart())) - if _, err := m.writer.Seek(int64(segment.GetStart()), io.SeekStart); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "") - m.mux.Unlock() - return false - } - m.mux.Unlock() - } else { - self.Log(sinkCAT, gst.LevelDebug, "Ignored SEGMENT, no seek needed") - } - } else { - self.Log(sinkCAT, gst.LevelDebug, fmt.Sprintf("Ignored SEGMENT event of format %s", segment.GetFormat().String())) - } - - case gst.EventTypeFlushStop: - self.Log(sinkCAT, gst.LevelInfo, "Flushing contents of writer and seeking back to start") - if m.writer.currentPosition != 0 { - m.mux.Lock() - if err := m.writer.flush(true); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, err.Error(), "") - m.mux.Unlock() - return false - } - if _, err := m.writer.Seek(0, io.SeekStart); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "") - m.mux.Unlock() - return false - } - m.mux.Unlock() - } - - case gst.EventTypeEOS: - self.Log(sinkCAT, gst.LevelInfo, "Received EOS, closing MinIO writer") - m.mux.Lock() - if err := m.writer.Close(); err != nil { - self.Log(sinkCAT, gst.LevelError, err.Error()) - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to close MinIO writer: %s", err.Error()), "") - m.mux.Unlock() - return false - } - m.mux.Unlock() - } - - return self.ParentEvent(event) - -} diff --git a/examples/plugins/minio/miniosrc.go b/examples/plugins/minio/miniosrc.go deleted file mode 100644 index 4462b9c..0000000 --- a/examples/plugins/minio/miniosrc.go +++ /dev/null @@ -1,211 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - "os" - "strings" - "sync" - - minio "github.com/minio/minio-go/v7" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" -) - -var srcCAT = gst.NewDebugCategory( - "miniosrc", - gst.DebugColorNone, - "MinIOSrc Element", -) - -type minioSrc struct { - settings *settings - state *srcstate -} - -type srcstate struct { - started bool - object *minio.Object - objInfo minio.ObjectInfo - - mux sync.Mutex -} - -func (m *minioSrc) New() glib.GoObjectSubclass { - srcCAT.Log(gst.LevelLog, "Creating new minioSrc object") - return &minioSrc{ - settings: defaultSettings(), - state: &srcstate{}, - } -} - -func (m *minioSrc) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - srcCAT.Log(gst.LevelLog, "Initializing miniosrc class") - class.SetMetadata( - "MinIO Source", - "Source/File", - "Read stream from a MinIO object", - "Avi Zimmerman ", - ) - srcCAT.Log(gst.LevelLog, "Adding src pad template and properties to class") - class.AddPadTemplate(gst.NewPadTemplate( - "src", - gst.PadDirectionSource, - gst.PadPresenceAlways, - gst.NewAnyCaps(), - )) - class.InstallProperties(srcProperties) -} - -func (m *minioSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) { - setProperty(gst.ToElement(self), srcProperties, m.settings, id, value) -} - -func (m *minioSrc) GetProperty(self *glib.Object, id uint) *glib.Value { - return getProperty(gst.ToElement(self), srcProperties, m.settings, id) -} - -func (m *minioSrc) Constructed(self *glib.Object) { - base.ToGstBaseSrc(self).Log(srcCAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes") - base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes) -} - -func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return true } - -func (m *minioSrc) GetSize(self *base.GstBaseSrc) (bool, int64) { - if !m.state.started { - return false, 0 - } - return true, m.state.objInfo.Size -} - -func (m *minioSrc) Start(self *base.GstBaseSrc) bool { - - if m.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "MinIOSrc is already started", "") - return false - } - - if m.settings.bucket == "" { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "No source bucket defined", "") - return false - } - - if m.settings.key == "" { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "No object key defined", "") - return false - } - - m.state.mux.Lock() - - if strings.HasPrefix(m.settings.accessKeyID, "env:") { - spl := strings.Split(m.settings.accessKeyID, "env:") - m.settings.accessKeyID = os.Getenv(spl[len(spl)-1]) - } - - if strings.HasPrefix(m.settings.secretAccessKey, "env:") { - spl := strings.Split(m.settings.secretAccessKey, "env:") - m.settings.secretAccessKey = os.Getenv(spl[len(spl)-1]) - } - - client, err := getMinIOClient(m.settings) - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, - fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error()) - m.state.mux.Unlock() - return false - } - - self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint)) - m.state.object, err = client.GetObject(context.Background(), m.settings.bucket, m.settings.key, minio.GetObjectOptions{}) - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, - fmt.Sprintf("Failed to retrieve object %q from bucket %q", m.settings.key, m.settings.bucket), err.Error()) - m.state.mux.Unlock() - return false - } - - self.Log(srcCAT, gst.LevelInfo, "Getting HEAD for object") - m.state.objInfo, err = m.state.object.Stat() - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, - fmt.Sprintf("Failed to stat object %q in bucket %q: %s", m.settings.key, m.settings.bucket, err.Error()), "") - m.state.mux.Unlock() - return false - } - self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo)) - - m.state.started = true - m.state.mux.Unlock() - - self.StartComplete(gst.FlowOK) - - self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has started") - return true -} - -func (m *minioSrc) Stop(self *base.GstBaseSrc) bool { - self.Log(srcCAT, gst.LevelInfo, "Stopping MinIOSrc") - m.state.mux.Lock() - defer m.state.mux.Unlock() - - if !m.state.started { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSrc is not started", "") - return false - } - - if err := m.state.object.Close(); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, "Failed to close the bucket object", err.Error()) - return false - } - - m.state.object = nil - m.state.started = false - - self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has stopped") - return true -} - -func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer *gst.Buffer) gst.FlowReturn { - - if !m.state.started || m.state.object == nil { - self.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "MinIOSrc is not started yet", "") - return gst.FlowError - } - - self.Log(srcCAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size)) - - m.state.mux.Lock() - defer m.state.mux.Unlock() - - data := make([]byte, size) - read, err := m.state.object.ReadAt(data, int64(offset)) - if err != nil && err != io.EOF { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorRead, - fmt.Sprintf("Failed to read %d bytes from object at offset %d", size, offset), err.Error()) - return gst.FlowError - } - - if read < int(size) { - self.Log(srcCAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read)) - trim := make([]byte, read) - copy(trim, data) - data = trim - } - - bufmap := buffer.Map(gst.MapWrite) - if bufmap == nil { - self.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to map buffer", "") - return gst.FlowError - } - defer buffer.Unmap() - - bufmap.WriteData(data) - buffer.SetSize(int64(read)) - - return gst.FlowOK -} diff --git a/examples/plugins/minio/plugin.go b/examples/plugins/minio/plugin.go deleted file mode 100644 index 35165e8..0000000 --- a/examples/plugins/minio/plugin.go +++ /dev/null @@ -1,71 +0,0 @@ -// This example demonstrates a src element that reads from objects in a minio bucket. -// Since minio implements the S3 API this plugin could also be used for S3 buckets by -// setting the correct endpoints and credentials. -// -// By default this plugin will use the credentials set in the environment at MINIO_ACCESS_KEY_ID -// and MINIO_SECRET_ACCESS_KEY however these can also be set on the element directly. -// -// In order to build the plugin for use by GStreamer, you can do the following: -// -// $ go build -o libgstminio.so -buildmode c-shared . -package main - -import "C" - -import ( - "unsafe" - - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/base" -) - -// The metadata for this plugin -var pluginMeta = &gst.PluginMetadata{ - MajorVersion: gst.VersionMajor, - MinorVersion: gst.VersionMinor, - Name: "minio-plugins", - Description: "GStreamer plugins for reading and writing from Minio", - Version: "v0.0.1", - License: gst.LicenseLGPL, - Source: "gst-pipeline-operator", - Package: "plugins", - Origin: "https://github.com/go-gst/gst-pipeline-operator", - ReleaseDate: "2021-01-12", - // The init function is called to register elements provided by the plugin. - Init: func(plugin *gst.Plugin) bool { - if ok := gst.RegisterElement( - plugin, - // The name of the element - "miniosrc", - // The rank of the element - gst.RankNone, - // The GoElement implementation for the element - &minioSrc{}, - // The base subclass this element extends - base.ExtendsBaseSrc, - ); !ok { - return ok - } - - if ok := gst.RegisterElement( - plugin, - // The name of the element - "miniosink", - // The rank of the element - gst.RankNone, - // The GoElement implementation for the element - &minioSink{}, - // The base subclass this element extends - base.ExtendsBaseSink, - ); !ok { - return ok - } - - return true - }, -} - -func main() {} - -//export gst_plugin_minio_get_desc -func gst_plugin_minio_get_desc() unsafe.Pointer { return pluginMeta.Export() } diff --git a/examples/plugins/minio/properties.go b/examples/plugins/minio/properties.go deleted file mode 100644 index 7f4b135..0000000 --- a/examples/plugins/minio/properties.go +++ /dev/null @@ -1,153 +0,0 @@ -package main - -import ( - "math" - - "github.com/go-gst/go-glib/glib" -) - -// Even though there is overlap in properties, they have to be declared twice. -// This is because the GType system doesn't allow for GObjects to share pointers -// to the exact same GParamSpecs. - -const defaultPartSize = 1024 * 1024 * 128 -const minPartSize = 1024 * 1024 * 5 - -var sinkProperties = []*glib.ParamSpec{ - glib.NewStringParam( - "endpoint", - "S3 API Endpoint", - "The endpoint for the S3 API server", - &defaultEndpoint, - glib.ParameterReadWrite, - ), - glib.NewBoolParam( - "use-tls", - "Use TLS", - "Use HTTPS for API requests", - defaultUseTLS, - glib.ParameterReadWrite, - ), - glib.NewBoolParam( - "tls-skip-verify", - "Disable TLS Verification", - "Don't verify the signature of the MinIO server certificate", - defaultInsecureSkipVerify, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "ca-cert-file", - "PEM CA Cert Bundle", - "A file containing a PEM certificate bundle to use to verify the MinIO certificate", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "region", - "Bucket region", - "The region where the bucket is", - &defaultRegion, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "bucket", - "Bucket name", - "The name of the MinIO bucket", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "key", - "Object key", - "The key of the object inside the bucket", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "access-key-id", - "Access Key ID", - "The access key ID to use for authentication", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "secret-access-key", - "Secret Access Key", - "The secret access key to use for authentication", - nil, - glib.ParameterReadWrite, - ), - glib.NewUint64Param( - "part-size", - "Part Size", - "Size for each part in the multi-part upload", - minPartSize, math.MaxInt64, defaultPartSize, - glib.ParameterReadWrite, - ), -} - -var srcProperties = []*glib.ParamSpec{ - glib.NewStringParam( - "endpoint", - "S3 API Endpoint", - "The endpoint for the S3 API server", - &defaultEndpoint, - glib.ParameterReadWrite, - ), - glib.NewBoolParam( - "use-tls", - "Use TLS", - "Use HTTPS for API requests", - defaultUseTLS, - glib.ParameterReadWrite, - ), - glib.NewBoolParam( - "tls-skip-verify", - "Disable TLS Verification", - "Don't verify the signature of the MinIO server certificate", - defaultInsecureSkipVerify, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "ca-cert-file", - "PEM CA Cert Bundle", - "A file containing a PEM certificate bundle to use to verify the MinIO certificate", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "region", - "Bucket region", - "The region where the bucket is", - &defaultRegion, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "bucket", - "Bucket name", - "The name of the MinIO bucket", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "key", - "Object key", - "The key of the object inside the bucket", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "access-key-id", - "Access Key ID", - "The access key ID to use for authentication. Use env: prefix to denote an environment variable.", - nil, - glib.ParameterReadWrite, - ), - glib.NewStringParam( - "secret-access-key", - "Secret Access Key", - "The secret access key to use for authentication. Use env: prefix to denote an environment variable.", - nil, - glib.ParameterReadWrite, - ), -} diff --git a/examples/plugins/minio/seek_writer.go b/examples/plugins/minio/seek_writer.go deleted file mode 100644 index 0dd45be..0000000 --- a/examples/plugins/minio/seek_writer.go +++ /dev/null @@ -1,190 +0,0 @@ -package main - -import ( - "bytes" - "context" - "crypto/sha256" - "fmt" - "io/ioutil" - "path" - - minio "github.com/minio/minio-go/v7" -) - -type seekWriter struct { - // The current position in the buffer - currentPosition int64 - // The size of each part to upload - partSize int64 - // A map of in memory parts to their content - parts map[int64][]byte - // A map of uploaded parts to the checksum at time of upload - uploadedParts map[int64]string - // A local reference to the minio client - client *minio.Client - bucket, key string -} - -func newSeekWriter(client *minio.Client, partsize int64, bucket, key string) *seekWriter { - return &seekWriter{ - currentPosition: 0, - partSize: partsize, - parts: make(map[int64][]byte), - uploadedParts: make(map[int64]string), - client: client, - bucket: bucket, key: key, - } -} - -func (s *seekWriter) Write(p []byte) (int, error) { - wrote, err := s.buffer(0, p) - if err != nil { - return wrote, err - } - return wrote, s.flush(false) -} - -func (s *seekWriter) Seek(offset int64, whence int) (int64, error) { - // Only needs to support SeekStart - s.currentPosition = offset - return s.currentPosition, nil -} - -func (s *seekWriter) Close() error { - if err := s.flush(true); err != nil { - return err - } - if len(s.uploadedParts) == 0 { - return nil - } - opts := make([]minio.CopySrcOptions, len(s.uploadedParts)) - for i := 0; i < len(opts); i++ { - opts[i] = minio.CopySrcOptions{ - Bucket: s.bucket, - Object: s.keyForPart(int64(i)), - } - } - _, err := s.client.ComposeObject(context.Background(), minio.CopyDestOptions{ - Bucket: s.bucket, - Object: s.key, - }, opts...) - - if err != nil { - return err - } - for _, opt := range opts { - if err := s.client.RemoveObject(context.Background(), opt.Bucket, opt.Object, minio.RemoveObjectOptions{}); err != nil { - return err - } - } - - return nil -} - -func (s *seekWriter) buffer(from int, p []byte) (int, error) { - currentPart := s.currentPosition / s.partSize - writeat := s.currentPosition % s.partSize - lenToWrite := int64(len(p)) - - var buf []byte - var ok bool - if buf, ok = s.parts[currentPart]; !ok { - if _, ok := s.uploadedParts[currentPart]; !ok { - s.parts[currentPart] = make([]byte, writeat+lenToWrite) - buf = s.parts[currentPart] - } else { - var err error - buf, err = s.fetchRemotePart(currentPart) - if err != nil { - return from, err - } - } - } - - if lenToWrite+writeat > s.partSize { - newbuf := make([]byte, s.partSize) - copy(newbuf, buf) - s.parts[currentPart] = newbuf - buf = newbuf - } else if lenToWrite+writeat > int64(len(buf)) { - newbuf := make([]byte, lenToWrite+writeat) - copy(newbuf, buf) - s.parts[currentPart] = newbuf - buf = newbuf - } - - wrote := copy(buf[writeat:], p) - - s.currentPosition += int64(wrote) - - if int64(wrote) != lenToWrite { - return s.buffer(from+wrote, p[wrote:]) - } - - return from + wrote, nil -} - -func (s *seekWriter) flush(all bool) error { - for part, buf := range s.parts { - if all || int64(len(buf)) == s.partSize { - if err := s.uploadPart(part, buf); err != nil { - return err - } - continue - } - if !all { - continue - } - if err := s.uploadPart(part, buf); err != nil { - return err - } - } - return nil -} - -func (s *seekWriter) uploadPart(part int64, data []byte) error { - h := sha256.New() - if _, err := h.Write(data); err != nil { - return err - } - datasum := fmt.Sprintf("%x", h.Sum(nil)) - if sum, ok := s.uploadedParts[part]; ok && sum == datasum { - return nil - } - _, err := s.client.PutObject(context.Background(), - s.bucket, s.keyForPart(part), - bytes.NewReader(data), int64(len(data)), - minio.PutObjectOptions{ - ContentType: "application/octet-stream", - }, - ) - if err != nil { - return err - } - delete(s.parts, part) - s.uploadedParts[part] = datasum - return nil -} - -func (s *seekWriter) fetchRemotePart(part int64) ([]byte, error) { - object, err := s.client.GetObject(context.Background(), s.bucket, s.keyForPart(part), minio.GetObjectOptions{}) - if err != nil { - return nil, err - } - body, err := ioutil.ReadAll(object) - if err != nil { - return nil, err - } - s.parts[part] = body - return body, nil -} - -func (s *seekWriter) keyForPart(part int64) string { - if path.Dir(s.key) == "" { - return fmt.Sprintf("%s_tmp/%d", s.key, part) - } - return path.Join( - path.Dir(s.key), - fmt.Sprintf("%s_tmp/%d", path.Base(s.key), part), - ) -} diff --git a/examples/plugins/registered_elements/internal/common/assert.go b/examples/plugins/registered_elements/internal/common/assert.go deleted file mode 100644 index 1cd3d33..0000000 --- a/examples/plugins/registered_elements/internal/common/assert.go +++ /dev/null @@ -1,11 +0,0 @@ -package common - -import "fmt" - -var FinalizersCalled int = 0 - -func AssertFinalizersCalled(x int) { - if FinalizersCalled != x { - panic(fmt.Sprintf("finalizers did not run correctly, memory leak, wanted: %d, got: %d", x, FinalizersCalled)) - } -} diff --git a/examples/plugins/registered_elements/internal/common/util.go b/examples/plugins/registered_elements/internal/common/util.go deleted file mode 100644 index f3ecacb..0000000 --- a/examples/plugins/registered_elements/internal/common/util.go +++ /dev/null @@ -1,9 +0,0 @@ -package common - -func Must[T any](v T, err error) T { - if err != nil { - panic("got error:" + err.Error()) - } - - return v -} diff --git a/examples/plugins/registered_elements/internal/custombin/element.go b/examples/plugins/registered_elements/internal/custombin/element.go index 31db7f0..6d5e24d 100644 --- a/examples/plugins/registered_elements/internal/custombin/element.go +++ b/examples/plugins/registered_elements/internal/custombin/element.go @@ -3,85 +3,40 @@ package custombin import ( "time" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/common" - "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/pkg/gst" ) type customBin struct { - // self *gst.Bin - source1 *gst.Element - source2 *gst.Element - mixer *gst.Element + gst.Bin // parent object must be first embedded field + source1 gst.Elementer + source2 gst.Elementer + mixer gst.Elementer } -// ClassInit is the place where you define pads and properties -func (*customBin) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - class.SetMetadata( - "custom test source", - "Src/Test", - "Demo source bin with volume", - "Wilhelm Bartel ", - ) - class.AddPadTemplate(gst.NewPadTemplate( - "src", - gst.PadDirectionSource, - gst.PadPresenceAlways, - gst.NewCapsFromString("audio/x-raw,channels=2,rate=48000"), - )) -} - -// SetProperty gets called for every property. The id is the index in the slice defined above. -func (s *customBin) SetProperty(self *glib.Object, id uint, value *glib.Value) {} - -// GetProperty is called to retrieve the value of the property at index `id` in the properties -// slice provided at ClassInit. -func (o *customBin) GetProperty(self *glib.Object, id uint) *glib.Value { - return nil -} - -// New is called by the bindings to create a new instance of your go element. Use this to initialize channels, maps, etc. -// -// Think of New like the constructor of your struct -func (*customBin) New() glib.GoObjectSubclass { - return &customBin{} -} - -// InstanceInit should initialize the element. Keep in mind that the properties are not yet present. When this is called. -func (s *customBin) InstanceInit(instance *glib.Object) { - self := gst.ToGstBin(instance) - - s.source1 = common.Must(gst.NewElementWithProperties("gocustomsrc", map[string]interface{}{ +// init should initialize the element. Keep in mind that the properties are not yet present. When this is called. +func (bin *customBin) init() { + bin.source1 = gst.ElementFactoryMakeWithProperties("gocustomsrc", map[string]interface{}{ "duration": int64(5 * time.Second), - })) - s.source2 = common.Must(gst.NewElementWithProperties("gocustomsrc", map[string]interface{}{ + }) + + bin.source2 = gst.ElementFactoryMakeWithProperties("gocustomsrc", map[string]interface{}{ "duration": int64(10 * time.Second), - })) + }) - s.mixer = common.Must(gst.NewElement("audiomixer")) + bin.mixer = gst.ElementFactoryMake("audiomixer", "") - klass := instance.Class() - class := gst.ToElementClass(klass) - - self.AddMany( - s.source1, - s.source2, - s.mixer, + bin.AddMany( + bin.source1, + bin.source2, + bin.mixer, ) - srcpad := s.mixer.GetStaticPad("src") + srcpad := bin.mixer.StaticPad("src") - ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, class.GetPadTemplate("src")) + ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, bin.PadTemplate("src")) - s.source1.Link(s.mixer) - s.source2.Link(s.mixer) + bin.source1.Link(bin.mixer) + bin.source2.Link(bin.mixer) - self.AddPad(ghostpad.Pad) -} - -func (s *customBin) Constructed(o *glib.Object) {} - -func (s *customBin) Finalize(o *glib.Object) { - common.FinalizersCalled++ + bin.AddPad(&ghostpad.Pad) } diff --git a/examples/plugins/registered_elements/internal/custombin/register.go b/examples/plugins/registered_elements/internal/custombin/register.go index e181f75..81ec66c 100644 --- a/examples/plugins/registered_elements/internal/custombin/register.go +++ b/examples/plugins/registered_elements/internal/custombin/register.go @@ -1,22 +1,45 @@ package custombin import ( - "github.com/go-gst/go-gst/gst" + "github.com/diamondburned/gotk4/pkg/core/glib" + "github.com/go-gst/go-gst/pkg/gst" ) // Register needs to be called after gst.Init() to make the gocustombin available in the standard // gst element registry. After this call the element can be used like any other gstreamer element func Register() bool { - return gst.RegisterElement( + registered := glib.RegisterSubclassWithConstructor[*customBin]( + func() *customBin { + return &customBin{} + }, + glib.WithOverrides[*customBin, gst.BinOverrides](func(b *customBin) gst.BinOverrides { + return gst.BinOverrides{} + }), + glib.WithClassInit[*gst.BinClass](func(bc *gst.BinClass) { + bc.ParentClass().SetStaticMetadata( + "custom test source", + "Src/Test", + "Demo source bin with volume", + "Wilhelm Bartel ", + ) + + bc.ParentClass().AddPadTemplate(gst.NewPadTemplate( + "src", + gst.PadSrc, + gst.PadAlways, + gst.CapsFromString("audio/x-raw,channels=2,rate=48000"), + )) + }), + ) + + return gst.ElementRegister( // no plugin: nil, // The name of the element "gocustombin", // The rank of the element - gst.RankNone, + uint(gst.RankNone), // The GoElement implementation for the element - &customBin{}, - // The base subclass this element extends - gst.ExtendsBin, + registered.Type(), ) } diff --git a/examples/plugins/registered_elements/internal/customsrc/element.go b/examples/plugins/registered_elements/internal/customsrc/element.go index d7de038..26abda0 100644 --- a/examples/plugins/registered_elements/internal/customsrc/element.go +++ b/examples/plugins/registered_elements/internal/customsrc/element.go @@ -1,13 +1,10 @@ package customsrc import ( - "fmt" "math" "time" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/common" - "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/pkg/gst" ) // default: 1024, this value makes it easier to calculate num buffers with the sample rate @@ -15,125 +12,44 @@ const samplesperbuffer = 4800 const samplerate = 48000 -var properties = []*glib.ParamSpec{ - glib.NewInt64Param( - "duration", - "duration", - "duration the source", - 0, - math.MaxInt64, - 0, - glib.ParameterReadWrite, - ), -} - type customSrc struct { - // self *gst.Bin - source *gst.Element - volume *gst.Element + gst.Bin // parent must be embedded as the first field - duration time.Duration -} + source gst.Elementer + volume gst.Elementer -// ClassInit is the place where you define pads and properties -func (*customSrc) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - class.SetMetadata( - "custom test source", - "Src/Test", - "Demo source bin with volume", - "Wilhelm Bartel ", - ) - class.AddPadTemplate(gst.NewPadTemplate( - "src", - gst.PadDirectionSource, - gst.PadPresenceAlways, - gst.NewCapsFromString(fmt.Sprintf("audio/x-raw,channels=2,rate=%d", samplerate)), - )) - class.InstallProperties(properties) -} - -// SetProperty gets called for every property. The id is the index in the slice defined above. -func (s *customSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) { - param := properties[id] - - bin := gst.ToGstBin(self) - - switch param.Name() { - case "duration": - state := bin.GetCurrentState() - if !(state == gst.StateNull || state != gst.StateReady) { - return - } - - gv, _ := value.GoValue() - - durI, _ := gv.(int64) - - s.duration = time.Duration(durI) - - s.updateSource() - } -} - -// GetProperty is called to retrieve the value of the property at index `id` in the properties -// slice provided at ClassInit. -func (o *customSrc) GetProperty(self *glib.Object, id uint) *glib.Value { - param := properties[id] - - switch param.Name() { - case "duration": - v, _ := glib.GValue(int64(o.duration)) - return v - } - - return nil -} - -func (*customSrc) New() glib.GoObjectSubclass { - return &customSrc{} + Duration time.Duration `glib:"duration"` } // InstanceInit should initialize the element. Keep in mind that the properties are not yet present. When this is called. -func (s *customSrc) InstanceInit(instance *glib.Object) { - self := gst.ToGstBin(instance) +func (bin *customSrc) init() { + bin.source = gst.ElementFactoryMake("audiotestsrc", "") + bin.volume = gst.ElementFactoryMake("volume", "") - s.source = common.Must(gst.NewElement("audiotestsrc")) - s.volume = common.Must(gst.NewElement("volume")) - - klass := instance.Class() - class := gst.ToElementClass(klass) - - self.AddMany( - s.source, - s.volume, + bin.AddMany( + bin.source, + bin.volume, ) - srcpad := s.volume.GetStaticPad("src") + srcpad := bin.volume.StaticPad("src") - ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, class.GetPadTemplate("src")) + ghostpad := gst.NewGhostPadFromTemplate("src", srcpad, bin.PadTemplate("src")) - gst.ElementLinkMany( - s.source, - s.volume, + gst.LinkMany( + bin.source, + bin.volume, ) - self.AddPad(ghostpad.Pad) + bin.AddPad(&ghostpad.Pad) - s.updateSource() -} - -func (s *customSrc) Constructed(o *glib.Object) {} - -func (s *customSrc) Finalize(o *glib.Object) { - common.FinalizersCalled++ + bin.updateSource() } // updateSource will get called to update the audiotestsrc when a property changes func (s *customSrc) updateSource() { if s.source != nil { - numBuffers := (float64(s.duration / time.Second)) / (float64(samplesperbuffer) / float64(samplerate)) + numBuffers := (float64(s.Duration / time.Second)) / (float64(samplesperbuffer) / float64(samplerate)) - s.source.SetProperty("num-buffers", int(math.Ceil(numBuffers))) + s.source.SetObjectProperty("num-buffers", int(math.Ceil(numBuffers))) } } diff --git a/examples/plugins/registered_elements/internal/customsrc/register.go b/examples/plugins/registered_elements/internal/customsrc/register.go index 2a85ea2..aecc939 100644 --- a/examples/plugins/registered_elements/internal/customsrc/register.go +++ b/examples/plugins/registered_elements/internal/customsrc/register.go @@ -1,22 +1,44 @@ package customsrc import ( - "github.com/go-gst/go-gst/gst" + "github.com/diamondburned/gotk4/pkg/core/glib" + "github.com/go-gst/go-gst/pkg/gst" ) // Register needs to be called after gst.Init() to make the gocustomsrc available in the standard // gst element registry. After this call the element can be used like any other gstreamer element func Register() bool { - return gst.RegisterElement( + registered := glib.RegisterSubclassWithConstructor[*customSrc]( + func() *customSrc { + return &customSrc{} + }, + glib.WithOverrides[*customSrc, gst.BinOverrides](func(b *customSrc) gst.BinOverrides { + return gst.BinOverrides{} + }), + glib.WithClassInit[*gst.BinClass](func(bc *gst.BinClass) { + bc.ParentClass().SetStaticMetadata( + "custom test source", + "Src/Test", + "Demo source bin with volume", + "Wilhelm Bartel ", + ) + + bc.ParentClass().AddPadTemplate(gst.NewPadTemplate( + "src", + gst.PadSrc, + gst.PadAlways, + gst.CapsFromString("audio/x-raw,channels=2,rate=48000"), + )) + }), + ) + + return gst.ElementRegister( // no plugin: nil, // The name of the element "gocustomsrc", // The rank of the element - gst.RankNone, - // The GoElement implementation for the element - &customSrc{}, - // The base subclass this element extends - gst.ExtendsBin, + uint(gst.RankNone), + registered.Type(), ) } diff --git a/examples/plugins/registered_elements/main.go b/examples/plugins/registered_elements/main.go index 1440045..ea766b2 100644 --- a/examples/plugins/registered_elements/main.go +++ b/examples/plugins/registered_elements/main.go @@ -6,13 +6,12 @@ import ( "os" "os/signal" "path/filepath" - "runtime/pprof" + "time" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/common" + "github.com/diamondburned/gotk4/pkg/glib/v2" "github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/custombin" "github.com/go-gst/go-gst/examples/plugins/registered_elements/internal/customsrc" - "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/pkg/gst" ) func run(ctx context.Context) error { @@ -25,32 +24,34 @@ func run(ctx context.Context) error { return err } - gst.Init(nil) + gst.Init() customsrc.Register() custombin.Register() - systemclock := gst.ObtainSystemClock() + systemclock := gst.SystemClockObtain() - pipeline, err := gst.NewPipelineFromString("gocustombin ! fakesink sync=true") + ret, err := gst.ParseLaunch("gocustombin ! fakesink sync=true") if err != nil { return err } - pipeline.ForceClock(systemclock.Clock) + pipeline := ret.(*gst.Pipeline) - bus := pipeline.GetBus() + pipeline.UseClock(systemclock) + + bus := pipeline.Bus() mainloop := glib.NewMainLoop(glib.MainContextDefault(), false) pipeline.SetState(gst.StatePlaying) - bus.AddWatch(func(msg *gst.Message) bool { + bus.AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool { switch msg.Type() { case gst.MessageStateChanged: - old, new := msg.ParseStateChanged() - dot := pipeline.DebugBinToDotData(gst.DebugGraphShowVerbose) + old, new, _ := msg.ParseStateChanged() + dot := gst.DebugBinToDotData(&pipeline.Bin, gst.DebugGraphShowVerbose) f, err := os.OpenFile(filepath.Join(wd, fmt.Sprintf("pipeline-%s-to-%s.dot", old, new)), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0600) @@ -69,14 +70,12 @@ func run(ctx context.Context) error { return false } - case gst.MessageEOS: - fmt.Println(msg.String()) + case gst.MessageEos: + fmt.Println("reached EOS") cancel() return false } - // the String method is expensive and should not be used in prodution: - fmt.Println(msg.String()) return true }) @@ -86,7 +85,7 @@ func run(ctx context.Context) error { mainloop.Quit() - pipeline.BlockSetState(gst.StateNull) + pipeline.BlockSetState(gst.StateNull, gst.ClockTime(time.Second)) gst.Deinit() @@ -101,14 +100,4 @@ func main() { if err != nil { fmt.Fprintln(os.Stderr, err) } - - // this is very helpful to find memory leaks, see github.com/go-gst/asanutils - // asanutils.CheckLeaks() - - prof := pprof.Lookup("go-glib-reffed-objects") - - prof.WriteTo(os.Stdout, 1) - - // we are creating 3 custom elements in total. If this panics, then the go struct will memory leak - common.AssertFinalizersCalled(3) } diff --git a/examples/plugins/websocketsrc/go.mod b/examples/plugins/websocketsrc/go.mod deleted file mode 100644 index 058e834..0000000 --- a/examples/plugins/websocketsrc/go.mod +++ /dev/null @@ -1,14 +0,0 @@ -module github.com/go-gst/go-gst/examples/plugins/websocketsrc - -go 1.22 - -require ( - github.com/go-gst/go-glib v1.2.1 - github.com/go-gst/go-gst v1.2.1 - golang.org/x/net v0.29.0 -) - -require ( - github.com/mattn/go-pointer v0.0.1 // indirect - golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect -) diff --git a/examples/plugins/websocketsrc/go.sum b/examples/plugins/websocketsrc/go.sum deleted file mode 100644 index 0f77667..0000000 --- a/examples/plugins/websocketsrc/go.sum +++ /dev/null @@ -1,10 +0,0 @@ -github.com/go-gst/go-glib v1.2.1 h1:ibAr5N1NmuHmZ5RaCFjFjeUy0Rk3t3LgvGutmwBeR9E= -github.com/go-gst/go-glib v1.2.1/go.mod h1:JybIYeoHNwCkHGaBf1fHNIaM4sQTrJPkPLsi7dmPNOU= -github.com/go-gst/go-gst v1.2.1 h1:FqUFGFllbuC8LkQoqULgAui2ZS0VU1WEBCNekIMcBEE= -github.com/go-gst/go-gst v1.2.1/go.mod h1:OGPRsJdvNYCKjt3e4H4i8J6KVd2Wk5S2lzsEQ8mO1+g= -github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= -github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= -golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= diff --git a/examples/plugins/websocketsrc/websocketsrc.go b/examples/plugins/websocketsrc/websocketsrc.go deleted file mode 100644 index 12a28e2..0000000 --- a/examples/plugins/websocketsrc/websocketsrc.go +++ /dev/null @@ -1,552 +0,0 @@ -// This is a GStreamer element implemented in Go that uses inbound data on a websocket -// connection as the source for the stream. -// -// In order to build the plugin for use by GStreamer, you can do the following: -// -// $ go generate -// $ go build -o libgstwebsocketsrc.so -buildmode c-shared . -// -// +plugin:Name=websocketsrc -// +plugin:Description=GStreamer Websocket Source -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/go-gst/go-gst -// +plugin:ReleaseDate=2021-01-10 -// -// +element:Name=websocketsrc -// +element:Rank=gst.RankNone -// +element:Impl=websocketSrc -// +element:Subclass=gst.ExtendsElement -// -//go:generate gst-plugin-gen -package main - -import ( - "context" - "fmt" - "net/http" - "sync" - "time" - - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "golang.org/x/net/websocket" -) - -// MaxPayloadSize to accept over websocket connections. Also the size of buffers. -const MaxPayloadSize = 1024 - -// Defaults // -var ( - DefaultAddress string = "0.0.0.0" - DefaultPort int = 5000 - DefaultRetrieveRemoteAddr bool = true -) - -func main() {} - -// CAT is the log category for the websocketsrc. -var CAT = gst.NewDebugCategory( - "websocketsrc", - gst.DebugColorNone, - "WebsocketSrc Element", -) - -var properties = []*glib.ParamSpec{ - glib.NewStringParam( - "address", - "Server Address", - "The address to bind the server to", - &DefaultAddress, - glib.ParameterReadWrite, - ), - glib.NewIntParam( - "port", - "Server Port", - "The port to bind the server to", - 1024, 65535, - DefaultPort, - glib.ParameterReadWrite, - ), - // not implemented yet - glib.NewBoolParam( - "retrieve-remote-addr", - "Retrieve Remote Address", - "Include the remote client's address in the buffer metadata", - DefaultRetrieveRemoteAddr, - glib.ParameterReadWrite, - ), -} - -// Internals // - -// A private settings struct to hold the values of the above parameters -type settings struct { - address string - port int - retrieveRemoteAddr bool -} - -// Helper function to retrieve a settings object set to the default values. -func defaultSettings() *settings { - return &settings{ - address: DefaultAddress, - port: DefaultPort, - retrieveRemoteAddr: DefaultRetrieveRemoteAddr, - } -} - -// The internal state object -type state struct { - serverStarted, channelsStarted, sentInitialEvents, sentSegment bool - server *http.Server - srcpad *gst.Pad - bufferpool *gst.BufferPool - bufferchan chan []byte - stopchan chan struct{} - - mux sync.Mutex - connmux sync.Mutex -} - -// Base struct definition for the websocket src -type websocketSrc struct { - settings *settings - state *state -} - -// prepare verifies the src pad has been added to the element, and then sets up server -// handlers and a buffer pool -func (w *websocketSrc) prepare(elem *gst.Element) error { - w.state.mux.Lock() - defer w.state.mux.Unlock() - - // Make sure we have a srcpad - if w.state.srcpad == nil { - w.setupSrcPad(elem) - } - - elem.Log(CAT, gst.LevelDebug, "Creating channels for goroutines") - - // Setup a channel for handling buffers - w.state.bufferchan = make(chan []byte) - w.state.stopchan = make(chan struct{}) - - elem.Log(CAT, gst.LevelDebug, "Setting up the HTTP server") - - // Setup the HTTP server instance - w.state.server = &http.Server{ - Addr: fmt.Sprintf("%s:%d", w.settings.address, w.settings.port), - ReadTimeout: 300 * time.Second, - WriteTimeout: 300 * time.Second, - Handler: &websocket.Server{ - // Don't check the Origin header - Handshake: func(*websocket.Config, *http.Request) error { return nil }, - Handler: func(conn *websocket.Conn) { - elem.Log(CAT, gst.LevelInfo, fmt.Sprintf("Received new connection from: %s", conn.Request().RemoteAddr)) - - // Only allow a stream from one client at a time - w.state.connmux.Lock() - defer w.state.connmux.Unlock() - - conn.PayloadType = websocket.BinaryFrame - conn.MaxPayloadBytes = MaxPayloadSize - - for { - // Read the PayloadSize into a bytes slice - buf := make([]byte, conn.MaxPayloadBytes) - size, err := conn.Read(buf) - if err != nil { - elem.ErrorMessage(gst.DomainStream, gst.StreamErrorFailed, "Error reading bytes from client", err.Error()) - return - } - - // The goroutine listening for buffers will use the size to determine offsets, - // So trim the zeroes if we receive a buffer less than the requested size. - if size < conn.MaxPayloadBytes { - trimmed := make([]byte, size) - copy(trimmed, buf) - buf = trimmed - } - - // Queue the buffer for processing - elem.Log(CAT, gst.LevelLog, fmt.Sprintf("Queueing %d bytes for processing", len(buf))) - w.state.bufferchan <- buf - } - }, - }, - } - - elem.Log(CAT, gst.LevelDebug, "Configuring a buffer pool") - - // Configure a buffer pool - w.state.bufferpool = gst.NewBufferPool() - cfg := w.state.bufferpool.GetConfig() - cfg.SetParams(nil, MaxPayloadSize, 0, 0) - w.state.bufferpool.SetConfig(cfg) - w.state.bufferpool.SetActive(true) - - return nil -} - -// This runs in a goroutine and checks for pause events or new buffers to push onto the pad. -func (w *websocketSrc) watchChannels(elem *gst.Element) { - for { - select { - - case data, more := <-w.state.bufferchan: - if !more { - elem.Log(CAT, gst.LevelInfo, "Buffer channel has closed, stopping processing") - return - } - elem.Log(CAT, gst.LevelDebug, "Retrieving buffer from the pool") - - buf, ret := w.state.bufferpool.AcquireBuffer(nil) - if ret != gst.FlowOK { - elem.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, - fmt.Sprintf("Could not allocate buffer for data: %s", ret), "") - return - } - - elem.Log(CAT, gst.LevelDebug, "Writing data to buffer") - buf.Map(gst.MapWrite).WriteData(data) - buf.Unmap() - buf.SetSize(int64(len(data))) - - elem.Log(CAT, gst.LevelDebug, "Pushing buffer onto src pad") - w.pushPrelude(elem) - if ret := w.state.srcpad.Push(buf); ret == gst.FlowError { - elem.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, - fmt.Sprintf("Failed to push buffer to srcpad: %s", ret), "") - return - } - - case <-w.state.stopchan: - elem.Log(CAT, gst.LevelInfo, "Received signal on stopchan to halt buffer processing") - return - - } - } -} - -// start will start the websocket server and the buffer processing goroutines. -func (w *websocketSrc) start(elem *gst.Element) { - w.state.mux.Lock() - defer w.state.mux.Unlock() - - if !w.state.serverStarted { - elem.Log(CAT, gst.LevelInfo, "Starting the HTTP server") - go w.startServer(elem) - w.state.serverStarted = true - } - if !w.state.channelsStarted { - elem.Log(CAT, gst.LevelInfo, "Starting channel goroutine") - go w.watchChannels(elem) - w.state.channelsStarted = true - } - elem.Log(CAT, gst.LevelInfo, "WebsocketSrc has started") -} - -// starts the server, is called as a goroutine. -func (w *websocketSrc) startServer(elem *gst.Element) { - if err := w.state.server.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - elem.Log(CAT, gst.LevelInfo, "Server exited cleanly") - return - } - elem.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "Failed to start websocket server", err.Error()) - } -} - -// Checks if initial stream events were sent and pushes them onto the pad if needed. -func (w *websocketSrc) pushPrelude(elem *gst.Element) { - w.state.mux.Lock() - defer w.state.mux.Unlock() - - if !w.state.sentInitialEvents { - elem.Log(CAT, gst.LevelDebug, "Sending stream start event") - - streamid := "blahblahblah" - ev := gst.NewStreamStartEvent(streamid) - if res := w.state.srcpad.PushEvent(ev); !res { - elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to notify elements of stream start", "") - return - } - w.state.sentInitialEvents = true - } - if !w.state.sentSegment { - elem.Log(CAT, gst.LevelDebug, "Sending new segment event") - - ev := gst.NewSegmentEvent(gst.NewFormattedSegment(gst.FormatTime)) - if res := w.state.srcpad.PushEvent(ev); !res { - elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to notify elements of new segment", "") - return - } - w.state.sentSegment = true - } -} - -// Stops the goroutines and the websocket server -func (w *websocketSrc) stop(elem *gst.Element) { - w.state.mux.Lock() - defer w.state.mux.Unlock() - - if w.state.channelsStarted { - elem.Log(CAT, gst.LevelInfo, "Sending stop signal to go routines") - w.state.stopchan <- struct{}{} - w.state.channelsStarted = false - } - - if w.state.serverStarted { - elem.Log(CAT, gst.LevelInfo, "Shutting down HTTP server") - w.state.server.Shutdown(context.Background()) - w.state.serverStarted = false - } -} - -// Just stops the buffer processing routine, but leaves the server running -func (w *websocketSrc) pause(elem *gst.Element) { - w.state.mux.Lock() - defer w.state.mux.Unlock() - elem.Log(CAT, gst.LevelDebug, "Sending stop signal to go routines") - w.state.stopchan <- struct{}{} - w.state.channelsStarted = false -} - -// Tears down all resources for the element. -func (w *websocketSrc) unprepare(elem *gst.Element) { - w.state.mux.Lock() - defer w.state.mux.Unlock() - - elem.Log(CAT, gst.LevelDebug, "Freeing pads and buffers") - - w.state.bufferpool.SetActive(false) - w.state.bufferpool.Unref() - - elem.Log(CAT, gst.LevelDebug, "Closing channels and clearing state") - - close(w.state.bufferchan) - close(w.state.stopchan) - w.state = &state{} -} - -// Sets up a src pad for an element and adds the necessary callbacks. -func (w *websocketSrc) setupSrcPad(elem *gst.Element) { - // Configure the src pad - elem.Log(CAT, gst.LevelDebug, "Configuring the src pad") - - w.state.srcpad = gst.NewPadFromTemplate(elem.GetPadTemplates()[0], "src") - elem.AddPad(w.state.srcpad) - - // Set a function for handling events - w.state.srcpad.SetEventFunction(func(pad *gst.Pad, parent *gst.Object, event *gst.Event) bool { - var ret bool - - pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Handling event: %s", event.Type())) - - switch event.Type() { - case gst.EventTypeReconfigure: - ret = true - case gst.EventTypeLatency: - ret = true - default: - ret = false - } - - if ret { - pad.Log(CAT, gst.LevelDebug, fmt.Sprintf("Handled event: %s", event.Type())) - } else { - pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Didn't handle event: %s", event.Type())) - } - - return ret - }) - - // Set a query handler for the src pad - w.state.srcpad.SetQueryFunction(func(pad *gst.Pad, parent *gst.Object, query *gst.Query) bool { - var ret bool - - pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Handling query: %s", query.Type())) - - switch query.Type() { - case gst.QueryLatency: - query.SetLatency(true, 0, gst.ClockTimeNone) - ret = true - case gst.QueryScheduling: - query.SetScheduling(gst.SchedulingFlagSequential, 1, -1, 0) - query.AddSchedulingMode(gst.PadModePush) - ret = true - case gst.QueryCaps: - query.SetCapsResult(gst.NewAnyCaps()) - ret = true - default: - ret = false - } - - if ret { - pad.Log(CAT, gst.LevelDebug, fmt.Sprintf("Handled query: %s", query.Type())) - } else { - pad.Log(CAT, gst.LevelLog, fmt.Sprintf("Didn't handle query: %s", query.Type())) - } - - return ret - }) -} - -// * ObjectSubclass * // - -func (w *websocketSrc) New() glib.GoObjectSubclass { - return &websocketSrc{ - settings: defaultSettings(), - state: &state{}, - } -} - -func (w *websocketSrc) ClassInit(klass *glib.ObjectClass) { - class := gst.ToElementClass(klass) - class.SetMetadata( - "Websocket Src", - "Src/Websocket", - "Write stream from a connection over a websocket server", - "Avi Zimmerman ", - ) - class.AddPadTemplate(gst.NewPadTemplate( - "src", - gst.PadDirectionSource, - gst.PadPresenceAlways, - gst.NewAnyCaps(), - )) - class.InstallProperties(properties) -} - -// * Object * // -func (w *websocketSrc) SetProperty(self *glib.Object, id uint, value *glib.Value) { - prop := properties[id] - - switch prop.Name() { - case "address": - val, err := value.GetString() - if err != nil { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - "Could not get string from GValue", - err.Error(), - ) - return - } - w.settings.address = val - case "port": - val, err := value.GoValue() - if err != nil { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - "Could not get go value from GValue", - err.Error(), - ) - return - } - intval, ok := val.(int) - if !ok { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - fmt.Sprintf("Could not coerce govalue %v to integer", val), - err.Error(), - ) - return - } - w.settings.port = intval - case "retrieve-remote-addr": - val, err := value.GoValue() - if err != nil { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - "Could not get go value from GValue", - err.Error(), - ) - return - } - boolval, ok := val.(bool) - if !ok { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - fmt.Sprintf("Could not coerce govalue %v to bool", val), - err.Error(), - ) - return - } - w.settings.retrieveRemoteAddr = boolval - default: - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, - fmt.Sprintf("Cannot set invalid property %s", prop.Name()), "") - - } -} - -func (w *websocketSrc) GetProperty(self *glib.Object, id uint) *glib.Value { - prop := properties[id] - - var localVal interface{} - - switch prop.Name() { - case "address": - localVal = w.settings.address - case "port": - localVal = w.settings.port - case "retrieve-remote-addr": - localVal = w.settings.retrieveRemoteAddr - default: - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, - fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "") - return nil - } - - val, err := glib.GValue(localVal) - if err != nil { - gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, - fmt.Sprintf("Could not convert %v to GValue", localVal), - err.Error(), - ) - } - - return val -} - -func (w *websocketSrc) Constructed(self *glib.Object) { - elem := gst.ToElement(self) - w.setupSrcPad(elem) -} - -// * Element * // - -func (w *websocketSrc) ChangeState(self *gst.Element, transition gst.StateChange) (ret gst.StateChangeReturn) { - self.Log(CAT, gst.LevelTrace, fmt.Sprintf("Changing state: %s", transition)) - - ret = gst.StateChangeSuccess - - switch transition { - case gst.StateChangeNullToReady: - if err := w.prepare(self); err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "") - return gst.StateChangeFailure - } - case gst.StateChangePlayingToPaused: - w.pause(self) - case gst.StateChangeReadyToNull: - w.unprepare(self) - } - - // Apply the transition to the parent element - if ret = self.ParentChangeState(transition); ret == gst.StateChangeFailure { - return - } - - switch transition { - case gst.StateChangeReadyToPaused: - ret = gst.StateChangeNoPreroll - case gst.StateChangePausedToPlaying: - w.start(self) - case gst.StateChangePlayingToPaused: - ret = gst.StateChangeNoPreroll - case gst.StateChangePausedToReady: - w.stop(self) - } - - return -} diff --git a/examples/queries/main.go b/examples/queries/main.go index 7e7b54f..8cc5a34 100644 --- a/examples/queries/main.go +++ b/examples/queries/main.go @@ -20,29 +20,32 @@ import ( "strings" "time" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples" - "github.com/go-gst/go-gst/gst" + "github.com/diamondburned/gotk4/pkg/glib/v2" + "github.com/go-gst/go-gst/pkg/gst" ) -func queries(mainLoop *glib.MainLoop) error { +func queries() error { if len(os.Args) < 2 { fmt.Println("USAGE: queries ") os.Exit(1) } - gst.Init(nil) + gst.Init() + + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) // Let GStreamer create a pipeline from the parsed launch syntax on the cli. pipelineStr := strings.Join(os.Args[1:], " ") - pipeline, err := gst.NewPipelineFromString(pipelineStr) + ret, err := gst.ParseLaunch(pipelineStr) if err != nil { return err } + pipeline := ret.(gst.Binner) + // Get a reference to the pipeline bus - bus := pipeline.GetPipelineBus() + bus := pipeline.Bus() // Start the pipeline pipeline.SetState(gst.StatePlaying) @@ -54,14 +57,14 @@ func queries(mainLoop *glib.MainLoop) error { // Create a new position query and send it to the pipeline. // This will traverse all elements in the pipeline, until one feels // capable of answering the query. - pos := gst.NewPositionQuery(gst.FormatTime) + pos := gst.NewQueryPosition(gst.FormatTime) if ok := pipeline.Query(pos); !ok { fmt.Println("Failed to query position from pipeline") } // Create a new duration query and send it to the pipeline. // This will traverse all elements in the pipeline, until one feels // capable of answering the query. - dur := gst.NewDurationQuery(gst.FormatTime) + dur := gst.NewQueryDuration(gst.FormatTime) if ok := pipeline.Query(dur); !ok { fmt.Println("Failed to query duration from pipeline") } @@ -77,14 +80,14 @@ func queries(mainLoop *glib.MainLoop) error { } }() - bus.AddWatch(func(msg *gst.Message) bool { + bus.AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool { switch msg.Type() { - case gst.MessageEOS: + case gst.MessageEos: mainLoop.Quit() case gst.MessageError: - gstErr := msg.ParseError() - fmt.Printf("Error from %s: %s\n", msg.Source(), gstErr.Error()) - if debug := gstErr.DebugString(); debug != "" { + gstErr, debug := msg.ParseError() + fmt.Printf("Error from %s: %s\n", msg.Src(), gstErr.Error()) + if debug != "" { fmt.Println("go-gst-debug:", debug) } mainLoop.Quit() @@ -100,5 +103,7 @@ func queries(mainLoop *glib.MainLoop) error { } func main() { - examples.RunLoop(queries) + if err := queries(); err != nil { + fmt.Println(err) + } } diff --git a/examples/requestpad/main.go b/examples/requestpad/main.go index acec086..dabab4e 100644 --- a/examples/requestpad/main.go +++ b/examples/requestpad/main.go @@ -3,11 +3,11 @@ package main import ( "errors" "fmt" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples" - "github.com/go-gst/go-gst/gst" "os" "time" + + "github.com/go-gst/go-glib/glib" + "github.com/go-gst/go-gst/pkg/gst" ) type workflow struct { @@ -15,58 +15,39 @@ type workflow struct { } func (w *workflow) newSrc() { - src, err := gst.NewElementWithName("videotestsrc", "src2") - if err != nil { - fmt.Printf("err %v\n", err) - return - } - src.Set("is-live", true) + src := gst.ElementFactoryMake("videotestsrc", "src2") + + src.SetObjectProperty("is-live", true) w.Add(src) - caps, err := gst.NewElementWithName("capsfilter", "caps2") - if err != nil { - fmt.Printf("err %v\n", err) - return - } - caps.Set("caps", gst.NewCapsFromString("video/x-raw , width=640, height=360")) + caps := gst.ElementFactoryMake("capsfilter", "caps2") + + caps.SetObjectProperty("caps", gst.CapsFromString("video/x-raw , width=640, height=360")) w.Add(caps) src.Link(caps) // Get a sink pad on compositor - mixer, err := w.GetElementByName("mixer") - if err != nil { - fmt.Printf("err %v\n", err) - return - } - pad := mixer.GetRequestPad("sink_%u") - pad.SetProperty("xpos", 640) - pad.SetProperty("ypos", 0) + mixer := w.ByName("mixer") - caps.GetStaticPad("src").Link(pad) + pad := mixer.GetRequestPad("sink_%u") + pad.SetObjectProperty("xpos", 640) + pad.SetObjectProperty("ypos", 0) + + caps.StaticPad("src").Link(pad) caps.SyncStateWithParent() src.SyncStateWithParent() } func (w *workflow) delSrc() { - mixer, err := w.GetElementByName("mixer") - if err != nil { - fmt.Printf("err %v\n", err) - return - } + mixer := w.ByName("mixer") - src, err := w.GetElementByName("src2") - if err != nil { - fmt.Printf("err %v\n", err) - return - } - caps, err := w.GetElementByName("caps2") - if err != nil { - fmt.Printf("err %v\n", err) - return - } - pad := mixer.GetStaticPad("sink_1") + src := w.ByName("src2") + + caps := w.ByName("caps2") + + pad := mixer.StaticPad("sink_1") if pad == nil { fmt.Printf("pad is null\n") return @@ -81,26 +62,16 @@ func (w *workflow) delSrc() { } func createPipeline() (*gst.Pipeline, error) { - gst.Init(nil) - var err error - var w workflow - w.Pipeline, err = gst.NewPipeline("") + gst.Init() + ret, err := gst.ParseLaunch("videotestsrc ! video/x-raw , capsfilter caps=width=640,height=360 name=caps1 ! compositor name=mixer ! autovideosink") + if err != nil { - fmt.Println(err) os.Exit(2) } - elements, err := gst.NewElementMany("videotestsrc", "capsfilter", "compositor", "autovideosink") - caps := elements[1] - caps.SetProperty("caps", gst.NewCapsFromString("video/x-raw , width=640, height=360")) - caps.SetProperty("name", "caps1") - mixer := elements[2] - mixer.SetProperty("name", "mixer") - if err != nil { - fmt.Printf("err %v\n", err) - return nil, err - } - w.AddMany(elements...) - gst.ElementLinkMany(elements...) + + var w workflow + + w.Pipeline = ret.(*gst.Pipeline) go func() { time.Sleep(time.Second) @@ -118,19 +89,19 @@ func runPipeline(loop *glib.MainLoop, pipeline *gst.Pipeline) error { pipeline.SetState(gst.StatePlaying) // Add a message watch to the bus to quit on any error - pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { + pipeline.Bus().AddWatch(0, func(bus *gst.Bus, msg *gst.Message) bool { var err error // If the stream has ended or any element posts an error to the // bus, populate error. switch msg.Type() { - case gst.MessageEOS: + case gst.MessageEos: err = errors.New("end-of-stream") case gst.MessageError: // The parsed error implements the error interface, but also // contains additional debug information. - gerr := msg.ParseError() - fmt.Println("go-gst-debug:", gerr.DebugString()) + gerr, debug := msg.ParseError() + fmt.Println("go-gst-debug:", debug) err = gerr } @@ -149,11 +120,12 @@ func runPipeline(loop *glib.MainLoop, pipeline *gst.Pipeline) error { } func main() { - examples.RunLoop(func(loop *glib.MainLoop) error { - pipeline, err := createPipeline() - if err != nil { - return err - } - return runPipeline(loop, pipeline) - }) + loop := glib.NewMainLoop(glib.MainContextDefault(), false) + + pipeline, err := createPipeline() + if err != nil { + os.Exit(2) + } + + runPipeline(loop, pipeline) } diff --git a/examples/tagsetter/main.go b/examples/tagsetter/main.go index c103d60..92950d8 100644 --- a/examples/tagsetter/main.go +++ b/examples/tagsetter/main.go @@ -24,30 +24,29 @@ package main import ( "fmt" - "github.com/go-gst/go-gst/examples" - "github.com/go-gst/go-gst/gst" + coreglib "github.com/diamondburned/gotk4/pkg/core/glib" + "github.com/go-gst/go-gst/pkg/gst" ) func tagsetter() error { - gst.Init(nil) + gst.Init() - pipeline, err := gst.NewPipelineFromString( + ret, err := gst.ParseLaunch( "audiotestsrc wave=white-noise num-buffers=10000 ! flacenc ! filesink location=test.flac", ) if err != nil { return err } + pipeline := ret.(*gst.Pipeline) + // Query the pipeline for elements implementing the GstTagsetter interface. // In our case, this will return the flacenc element. - element, err := pipeline.GetByInterface(gst.InterfaceTagSetter) - if err != nil { - return err - } + element := pipeline.ByInterface(gst.GTypeTagSetter) // We actually just retrieved a *gst.Element with the above call. We can retrieve // the underying TagSetter interface like this. - tagsetter := element.TagSetter() + tagsetter := element.(*gst.TagSetter) // Tell the element implementing the GstTagsetter interface how to handle already existing // metadata. @@ -57,14 +56,14 @@ func tagsetter() error { // // The first parameter gst.TagMergeAppend tells the tagsetter to append this title // if there already is one. - tagsetter.AddTagValue(gst.TagMergeAppend, gst.TagTitle, "Special randomized white-noise") + tagsetter.AddTagValue(gst.TagMergeAppend, gst.TAG_TITLE, coreglib.NewValue("Special randomized white-noise")) pipeline.SetState(gst.StatePlaying) var cont bool var pipelineErr error for { - msg := pipeline.GetPipelineBus().TimedPop(gst.ClockTimeNone) + msg := pipeline.Bus().TimedPop(gst.ClockTimeNone) if msg == nil { break } @@ -78,18 +77,18 @@ func tagsetter() error { } func handleMessage(msg *gst.Message) (bool, error) { - defer msg.Unref() switch msg.Type() { case gst.MessageTag: fmt.Println(msg) // Prirnt our tags - case gst.MessageEOS: + case gst.MessageEos: return false, nil case gst.MessageError: - return false, msg.ParseError() + err, _ := msg.ParseError() + return false, err } return true, nil } func main() { - examples.Run(tagsetter) + } diff --git a/examples/toc/main.go b/examples/toc/main.go index a1c93be..4330b14 100644 --- a/examples/toc/main.go +++ b/examples/toc/main.go @@ -17,57 +17,43 @@ import ( "os" "time" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/examples" - "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/pkg/gst" ) -func tagsetter(mainLoop *glib.MainLoop) error { - gst.Init(nil) +func tagsetter() error { + gst.Init() if len(os.Args) < 2 { return errors.New("usage: toc ") } - pipeline, err := gst.NewPipeline("") - if err != nil { - return err - } + pipeline := gst.NewPipeline("") - src, err := gst.NewElement("filesrc") - if err != nil { - return err - } - decodebin, err := gst.NewElement("decodebin") - if err != nil { - return err - } + src := gst.ElementFactoryMake("filesrc", "") - src.SetProperty("location", os.Args[1]) + decodebin := gst.ElementFactoryMake("decodebin", "") + + src.SetObjectProperty("location", os.Args[1]) pipeline.AddMany(src, decodebin) - gst.ElementLinkMany(src, decodebin) + gst.LinkMany(src, decodebin) // Connect to decodebin's pad-added signal, that is emitted whenever it found another stream // from the input file and found a way to decode it to its raw format. - decodebin.Connect("pad-added", func(_ *gst.Element, srcPad *gst.Pad) { + decodebin.ConnectPadAdded(func(srcPad *gst.Pad) { // In this example, we are only interested about parsing the ToC, so // we simply pipe every encountered stream into a fakesink, essentially // throwing away the data. - elems, err := gst.NewElementMany("queue", "fakesink") - if err != nil { - fmt.Println("Could not create decodebin pipeline") - return - } - pipeline.AddMany(elems...) - gst.ElementLinkMany(elems...) - for _, e := range elems { - e.SyncStateWithParent() - } + queue := gst.ElementFactoryMake("queue", "") + fakesink := gst.ElementFactoryMake("fakesink", "") - queue := elems[0] - sinkPad := queue.GetStaticPad("sink") + pipeline.AddMany(queue, fakesink) + gst.LinkMany(queue, fakesink) + queue.SyncStateWithParent() + fakesink.SyncStateWithParent() + + sinkPad := queue.StaticPad("sink") if sinkPad == nil { fmt.Println("Could not get static pad from sink") return @@ -77,8 +63,8 @@ func tagsetter(mainLoop *glib.MainLoop) error { Link(sinkPad) }) - if err := pipeline.SetState(gst.StatePaused); err != nil { - return err + if ret := pipeline.BlockSetState(gst.StatePaused, gst.ClockTime(time.Second)); ret != gst.StateChangeSuccess { + return fmt.Errorf("could not change state") } // Instead of using the main loop, we manually iterate over GStreamer's bus messages @@ -87,37 +73,32 @@ func tagsetter(mainLoop *glib.MainLoop) error { // timed_pop on the bus with the desired timeout for when to stop waiting for new messages. // (-1 = Wait forever) for { - msg := pipeline.GetPipelineBus().TimedPop(gst.ClockTimeNone) + msg := pipeline.Bus().TimedPop(gst.ClockTimeNone) switch msg.Type() { // When we use this method of popping from the bus (instead of a Watch), we own a // reference to every message received (this may be abstracted later). default: // fmt.Println(msg) - msg.Unref() // End of stream - case gst.MessageEOS: - msg.Unref() - + case gst.MessageEos: // Errors from any elements case gst.MessageError: - gerr := msg.ParseError() - if debug := gerr.DebugString(); debug != "" { + gerr, debug := msg.ParseError() + if debug != "" { fmt.Println("go-gst-debug:", debug) } - msg.Unref() return gerr // Some element found a ToC in the current media stream and told // us by posting a message to GStreamer's bus. - case gst.MessageTOC: + case gst.MessageToc: // Parse the toc from the message - toc, updated := msg.ParseTOC() - msg.Unref() - fmt.Printf("Received toc: %s - updated %v\n", toc.GetScope(), updated) + toc, updated := msg.ParseToc() + fmt.Printf("Received toc: %s - updated %v\n", toc.Scope().String(), updated) // Get a list of tags that are ToC specific. - if tags := toc.GetTags(); tags != nil { + if tags := toc.Tags(); tags != nil { fmt.Println("- tags:", tags) } // ToCs do not have a fixed structure. Depending on the format that @@ -127,43 +108,41 @@ func tagsetter(mainLoop *glib.MainLoop) error { // interpreting the ToC manually. // In this example, we simply want to print the ToC structure, so // we iterate everything and don't try to interpret anything. - for _, entry := range toc.GetEntries() { + for _, entry := range toc.Entries() { // Every entry in a ToC has its own type. One type could for // example be Chapter. - fmt.Printf("\t%s - %s\n", entry.GetEntryTypeString(), entry.GetUID()) + fmt.Printf("\t%s - %s\n", entry.EntryType().String(), entry.Uid()) // Every ToC entry can have a set of timestamps (start, stop). - if ok, start, stop := entry.GetStartStopTimes(); ok { + if start, stop, ok := entry.StartStopTimes(); ok { startDur := time.Duration(start) * time.Nanosecond stopDur := time.Duration(stop) * time.Nanosecond fmt.Printf("\t- start: %s, stop: %s\n", startDur, stopDur) } // Every ToC entry can have tags to it. - if tags := entry.GetTags(); tags != nil { + if tags := entry.Tags(); tags != nil { fmt.Println("\t- tags:", tags) } // Every ToC entry can have a set of child entries. // With this structure, you can create trees of arbitrary depth. - for _, subEntry := range entry.GetSubEntries() { - fmt.Printf("\n\t\t%s - %s\n", subEntry.GetEntryTypeString(), subEntry.GetUID()) - if ok, start, stop := entry.GetStartStopTimes(); ok { + for _, subEntry := range entry.SubEntries() { + fmt.Printf("\n\t\t%s - %s\n", subEntry.EntryType().String(), subEntry.Uid()) + if start, stop, ok := entry.StartStopTimes(); ok { startDur := time.Duration(start) * time.Nanosecond stopDur := time.Duration(stop) * time.Nanosecond fmt.Printf("\t\t- start: %s, stop: %s\n", startDur, stopDur) } - if tags := entry.GetTags(); tags != nil { + if tags := entry.Tags(); tags != nil { fmt.Println("\t\t- tags:", tags) } } } - - toc.Unref() } } } func main() { - examples.RunLoop(tagsetter) + tagsetter() }