diff --git a/.gitignore b/.gitignore index bb02864..1e54846 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ vendor/ # Build outputs dist/ -.vscode/ \ No newline at end of file +.vscode/ +_bin/ \ No newline at end of file diff --git a/Makefile b/Makefile index 9fa3272..27bb3a1 100644 --- a/Makefile +++ b/Makefile @@ -5,3 +5,16 @@ build-cmd: ARGS ?= run-cmd: build-cmd dist/go-gst $(ARGS) + +GOLANGCI_VERSION ?= 1.23.8 +GOLANGCI_LINT ?= _bin/golangci-lint +GOLANGCI_DOWNLOAD_URL ?= https://github.com/golangci/golangci-lint/releases/download/v${GOLANGCI_VERSION}/golangci-lint-${GOLANGCI_VERSION}-$(shell uname | tr A-Z a-z)-amd64.tar.gz + +$(GOLANGCI_LINT): + mkdir -p $(dir $(GOLANGCI_LINT)) + cd $(dir $(GOLANGCI_LINT)) && curl -JL $(GOLANGCI_DOWNLOAD_URL) | tar xzf - + chmod +x $(dir $(GOLANGCI_LINT))golangci-lint-$(GOLANGCI_VERSION)-$(shell uname | tr A-Z a-z)-amd64/golangci-lint + ln -s golangci-lint-$(GOLANGCI_VERSION)-$(shell uname | tr A-Z a-z)-amd64/golangci-lint $(GOLANGCI_LINT) + +lint: $(GOLANGCI_LINT) + $(GOLANGCI_LINT) run -v --timeout 300s --skip-dirs cmd/ \ No newline at end of file diff --git a/README.md b/README.md index 0ad3617..a6b8a97 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,8 @@ Go bindings for the gstreamer C library -[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-rounded)](https://pkg.go.dev/github.com/tinyzimmer/go-gst/gst) -[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/tinyzimmer/go-gst/gst) +[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-rounded)](https://pkg.go.dev/github.com/tinyzimmer/go-gst) +[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/tinyzimmer/go-gst) [![GoReportCard example](https://goreportcard.com/badge/github.com/nanomsg/mangos)](https://goreportcard.com/report/github.com/tinyzimmer/go-gst) This package was originally written to aid the audio support in [`kvdi`](https://github.com/tinyzimmer/kvdi). @@ -34,7 +34,7 @@ For now the functionality is limitted to GIF encoing and other arbitrary pipelin If I extend it further I'll publish releases, but for now, you can retrieve it with `go get`. ```bash -go get github.com/tinyzimmer/go-gst-launch/cmd/go-gst-launch +go get github.com/tinyzimmer/go-gst/cmd/go-gst ``` The usage is described below: diff --git a/cmd/go-gst/completion.go b/cmd/go-gst/completion.go index b80434b..a10e1ce 100644 --- a/cmd/go-gst/completion.go +++ b/cmd/go-gst/completion.go @@ -47,16 +47,25 @@ $ yourprogram completion fish > ~/.config/fish/completions/yourprogram.fish DisableFlagsInUseLine: true, ValidArgs: []string{"bash", "zsh", "fish", "powershell"}, Args: cobra.ExactValidArgs(1), - Run: func(cmd *cobra.Command, args []string) { + RunE: func(cmd *cobra.Command, args []string) error { switch args[0] { case "bash": - cmd.Root().GenBashCompletion(os.Stdout) + if err := cmd.Root().GenBashCompletion(os.Stdout); err != nil { + return err + } case "zsh": - cmd.Root().GenZshCompletion(os.Stdout) + if err := cmd.Root().GenZshCompletion(os.Stdout); err != nil { + return err + } case "fish": - cmd.Root().GenFishCompletion(os.Stdout, true) + if err := cmd.Root().GenFishCompletion(os.Stdout, true); err != nil { + return err + } case "powershell": - cmd.Root().GenPowerShellCompletion(os.Stdout) + if err := cmd.Root().GenPowerShellCompletion(os.Stdout); err != nil { + return err + } } + return nil }, } diff --git a/cmd/go-gst/gif.go b/cmd/go-gst/gif.go index 53ffb6f..1262774 100644 --- a/cmd/go-gst/gif.go +++ b/cmd/go-gst/gif.go @@ -66,6 +66,9 @@ func gifEncode(cmd *cobra.Command, args []string) error { } tmpDir, err := ioutil.TempDir("", "") + if err != nil { + return err + } defer os.RemoveAll(tmpDir) launchStr := fmt.Sprintf( @@ -75,11 +78,11 @@ func gifEncode(cmd *cobra.Command, args []string) error { logInfo("gif", "Converting video to image frames") - gstPipeline, err := gst.NewPipelineFromLaunchString(launchStr, gst.PipelineInternalOnly) + gstPipeline, err := gst.NewPipelineFromString(launchStr) if err != nil { return err } - defer gstPipeline.Close() + defer gstPipeline.Destroy() if err := gstPipeline.Start(); err != nil { return err diff --git a/cmd/go-gst/launch.go b/cmd/go-gst/launch.go index 83736fd..fa4104f 100644 --- a/cmd/go-gst/launch.go +++ b/cmd/go-gst/launch.go @@ -3,10 +3,12 @@ package main import ( "errors" "io" + "os" "strings" "github.com/spf13/cobra" "github.com/tinyzimmer/go-gst-launch/gst" + "github.com/tinyzimmer/go-gst-launch/gst/gstauto" ) func init() { @@ -16,7 +18,7 @@ func init() { var launchCmd = &cobra.Command{ Use: "launch", Short: "Run a generic pipeline", - Long: `Uses the provided pipeline string to encode/decode the data in the pipeline.`, + Long: `Uses the provided gstreamer string to encode/decode the data in the pipeline.`, PreRunE: func(cmd *cobra.Command, args []string) error { if len(args) == 0 { return errors.New("The pipeline string cannot be empty") @@ -33,41 +35,50 @@ func launch(cmd *cobra.Command, args []string) error { return err } - var flags gst.PipelineFlags - if src != nil { - flags = flags | gst.PipelineWrite - } - if dest != nil { - flags = flags | gst.PipelineRead - } - pipelineString := strings.Join(args, " ") logInfo("pipeline", "Creating pipeline") - gstPipeline, err := gst.NewPipelineFromLaunchString(pipelineString, flags) + + pipeliner, err := getPipeline(src, dest, pipelineString) if err != nil { return err } - defer gstPipeline.Close() - if verbose { - setupVerbosePipelineListeners(gstPipeline, "pipeline") + setupVerbosePipelineListeners(pipeliner.Pipeline(), "pipeline") } logInfo("pipeline", "Starting pipeline") - if err := gstPipeline.Start(); err != nil { + if err := pipeliner.Pipeline().Start(); err != nil { return err } if src != nil { - go io.Copy(gstPipeline, src) + pipelineWriter := pipeliner.(gstauto.WritePipeliner) + go io.Copy(pipelineWriter, src) + defer pipelineWriter.Close() } if dest != nil { - go io.Copy(dest, gstPipeline) + pipelineReader := pipeliner.(gstauto.ReadPipeliner) + go io.Copy(dest, pipelineReader) + defer pipelineReader.Close() + } else { + defer pipeliner.Pipeline().Destroy() } - gst.Wait(gstPipeline) - + gst.Wait(pipeliner.Pipeline()) return nil } + +func getPipeline(src, dest *os.File, pipelineString string) (gstauto.Pipeliner, error) { + if src != nil && dest != nil { + return gstauto.NewPipelineReadWriterSimpleFromString(pipelineString) + } + if src != nil { + return gstauto.NewPipelineWriterSimpleFromString(pipelineString) + } + if dest != nil { + return gstauto.NewPipelineReaderSimpleFromString(pipelineString) + } + return gstauto.NewPipelinerSimpleFromString(pipelineString) +} diff --git a/cmd/go-gst/main.go b/cmd/go-gst/main.go index a0c5796..7eb1f68 100644 --- a/cmd/go-gst/main.go +++ b/cmd/go-gst/main.go @@ -8,8 +8,8 @@ import ( ) var ( - srcFile, destFile, pipelineStr string - verbose, fromStdin, toStdout bool + srcFile, destFile string + verbose, fromStdin, toStdout bool rootCmd = &cobra.Command{ Use: "go-gst", diff --git a/cmd/go-gst/print_object_properties.go b/cmd/go-gst/print_object_properties.go index 1cd3656..d140ab1 100644 --- a/cmd/go-gst/print_object_properties.go +++ b/cmd/go-gst/print_object_properties.go @@ -273,10 +273,12 @@ func printObjectPropertiesInfo(obj *gst.Object, description string) { colorGreen.printf(`"%s"`, param.ValueType.Name()) if param.ValueType.Name() == "GstStructure" { structure := gst.StructureFromGValue(param.DefaultValue) - for key, val := range structure.Values() { - colorReset.printIndent(26, "(gpointer) ") - colorYellow.printf("%15s:", key) - colorBlue.printf("%v", val) + if structure != nil { + for key, val := range structure.Values() { + colorReset.printIndent(26, "(gpointer) ") + colorYellow.printf("%15s:", key) + colorBlue.printf("%v", val) + } } } diff --git a/cmd/go-gst/websocket.go b/cmd/go-gst/websocket.go index 007001e..256c767 100644 --- a/cmd/go-gst/websocket.go +++ b/cmd/go-gst/websocket.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" "github.com/tinyzimmer/go-gst-launch/gst" + "github.com/tinyzimmer/go-gst-launch/gst/gstauto" "golang.org/x/net/websocket" ) @@ -109,72 +110,69 @@ func handleWebsocketConnection(wsconn *websocket.Conn) { logInfo("websocket", "New connection from", wsconn.Request().RemoteAddr) wsconn.PayloadType = websocket.BinaryFrame - var playbackPipeline, recordingPipeline, sinkPipeline *gst.Pipeline - var err error - - playbackPipeline, err = newPlaybackPipeline() + playbackPipeline, err := newPlaybackPipeline() if err != nil { logInfo("websocket", "ERROR:", err.Error()) return } - playbackPipeline.SetAutoFlush(true) - logInfo("websocket", "Starting playback pipeline") - if err = playbackPipeline.Start(); err != nil { + if err = playbackPipeline.Pipeline().Start(); err != nil { logInfo("websocket", "ERROR:", err.Error()) return } if verbose { - setupVerbosePipelineListeners(playbackPipeline, "playback") + setupVerbosePipelineListeners(playbackPipeline.Pipeline(), "playback") } + var recordingPipeline gstauto.WritePipeliner + var sinkPipeline gstauto.Pipeliner if micFifo != "" { - recordingPipeline, err = newRecordingPipeline() + recordingPipeline, err := newRecordingPipeline() if err != nil { logInfo("websocket", "Could not open pipeline for recording:", err.Error()) return } defer recordingPipeline.Close() - sinkPipeline, err = newSinkPipeline() + sinkPipeline, err := newSinkPipeline() if err != nil { logInfo("websocket", "Could not open null sink pipeling. Disabling recording.") return } - defer sinkPipeline.Close() + defer sinkPipeline.Pipeline().Destroy() } if recordingPipeline != nil && sinkPipeline != nil { logInfo("websocket", "Starting recording pipeline") - if err = recordingPipeline.Start(); err != nil { + if err = recordingPipeline.Pipeline().Start(); err != nil { logInfo("websocket", "Could not start recording pipeline") return } logInfo("websocket", "Starting sink pipeline") - if err = sinkPipeline.Start(); err != nil { + if err = sinkPipeline.Pipeline().Start(); err != nil { logInfo("websocket", "Could not start sink pipeline") return } if verbose { - setupVerbosePipelineListeners(sinkPipeline, "mic-null-sink") + setupVerbosePipelineListeners(sinkPipeline.Pipeline(), "mic-null-sink") } var runMicFunc func() runMicFunc = func() { if verbose { - setupVerbosePipelineListeners(recordingPipeline, "recorder") + setupVerbosePipelineListeners(recordingPipeline.Pipeline(), "recorder") } go io.Copy(recordingPipeline, wsconn) go func() { var lastState gst.State - for msg := range recordingPipeline.GetBus().MessageChan() { + for msg := range recordingPipeline.Pipeline().GetBus().MessageChan() { defer msg.Unref() switch msg.Type() { case gst.MessageStateChanged: - if lastState == gst.StatePlaying && recordingPipeline.GetState() != gst.StatePlaying { + if lastState == gst.StatePlaying && recordingPipeline.Pipeline().GetState() != gst.StatePlaying { var nerr error recordingPipeline.Close() recordingPipeline, nerr = newRecordingPipeline() @@ -183,13 +181,13 @@ func handleWebsocketConnection(wsconn *websocket.Conn) { return } logInfo("websocket", "Restarting recording pipeline") - if nerr = recordingPipeline.Start(); nerr != nil { + if nerr = recordingPipeline.Pipeline().Start(); nerr != nil { logInfo("websocket", "Could not start new recording pipeline, stopping input stream") } runMicFunc() return } - lastState = recordingPipeline.GetState() + lastState = recordingPipeline.Pipeline().GetState() } } }() @@ -208,25 +206,17 @@ func handleWebsocketConnection(wsconn *websocket.Conn) { return } defer srcFile.Close() - stat, err := srcFile.Stat() - if err != nil { - return - } - appSrc := playbackPipeline.GetAppSrc() - appSrc.SetSize(stat.Size()) - appSrc.PushBuffer(srcFile) - for { - if ret := appSrc.EndStream(); ret == gst.FlowOK { - break - } - } - + // stat, err := srcFile.Stat() + // if err != nil { + // return + // } + go io.Copy(playbackPipeline.(gstauto.ReadWritePipeliner), srcFile) } - gst.Wait(playbackPipeline) + gst.Wait(playbackPipeline.Pipeline()) } -func newPlaybackPipelineFromString() (*gst.Pipeline, error) { +func newPlaybackPipelineFromString() (gstauto.ReadWritePipeliner, error) { pipelineString := "decodebin ! audioconvert ! audioresample" switch encoding { @@ -240,18 +230,18 @@ func newPlaybackPipelineFromString() (*gst.Pipeline, error) { logInfo("playback", "Using pipeline string", pipelineString) } - return gst.NewPipelineFromLaunchString(pipelineString, gst.PipelineReadWrite|gst.PipelineUseGstApp) + return gstauto.NewPipelineReadWriterSimpleFromString(pipelineString) } -func newPlaybackPipeline() (*gst.Pipeline, error) { +func newPlaybackPipeline() (gstauto.ReadPipeliner, error) { if srcFile != "" { return newPlaybackPipelineFromString() } - cfg := &gst.PipelineConfig{Elements: []*gst.PipelineElement{}} + cfg := &gstauto.PipelineConfig{Elements: []*gstauto.PipelineElement{}} - pulseSrc := &gst.PipelineElement{ + pulseSrc := &gstauto.PipelineElement{ Name: "pulsesrc", Data: map[string]interface{}{"server": pulseServer}, SinkCaps: gst.NewRawCaps("S16LE", 24000, 2), @@ -265,19 +255,19 @@ func newPlaybackPipeline() (*gst.Pipeline, error) { switch encoding { case "opus": - cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "cutter"}) - cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "opusenc"}) - cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "webmmux"}) + cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "cutter"}) + cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "opusenc"}) + cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "webmmux"}) case "vorbis": - cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "vorbisenc"}) - cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "oggmux"}) + cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "vorbisenc"}) + cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "oggmux"}) } - return gst.NewPipelineFromConfig(cfg, gst.PipelineRead|gst.PipelineUseGstApp, nil) + return gstauto.NewPipelineReaderSimpleFromConfig(cfg) } -func newRecordingPipeline() (*gst.Pipeline, error) { - return gst.NewPipelineFromLaunchString(newPipelineStringFromOpts(), gst.PipelineWrite) +func newRecordingPipeline() (gstauto.WritePipeliner, error) { + return gstauto.NewPipelineWriterSimpleFromString(newPipelineStringFromOpts()) } func newPipelineStringFromOpts() string { @@ -290,9 +280,9 @@ func newPipelineStringFromOpts() string { ) } -func newSinkPipeline() (*gst.Pipeline, error) { - cfg := &gst.PipelineConfig{ - Elements: []*gst.PipelineElement{ +func newSinkPipeline() (gstauto.Pipeliner, error) { + cfg := &gstauto.PipelineConfig{ + Elements: []*gstauto.PipelineElement{ { Name: "pulsesrc", Data: map[string]interface{}{"server": pulseServer, "device": micName}, @@ -304,5 +294,5 @@ func newSinkPipeline() (*gst.Pipeline, error) { }, }, } - return gst.NewPipelineFromConfig(cfg, gst.PipelineInternalOnly, nil) + return gstauto.NewPipelinerSimpleFromConfig(cfg) } diff --git a/gst/doc.go b/gst/doc.go index 61dfbea..d1bb558 100644 --- a/gst/doc.go +++ b/gst/doc.go @@ -1,141 +1,6 @@ /* -Package gst provides wrappers for building gstreamer pipelines and then -reading and/or writing from either end of the pipeline. - -It uses cgo to interface with the gstreamer-1.0 C API. - -A simple opus/webm encoder created from a launch string could look like this: - - import ( - "os" - "github.com/tinyzimmer/go-gst-launch/gst" - ) - - func main() { - gst.Init() - encoder, err := gst.NewPipelineFromLaunchString("opusenc ! webmmux", gst.PipelineReadWrite) - if err != nil { - panic(err) - } - - // You should close even if you don't start the pipeline, since this - // will free resources created by gstreamer. - defer encoder.Close() - - if err := encoder.Start() ; err != nil { - panic(err) - } - - go func() { - encoder.Write(...) // Write raw audio data to the pipeline - }() - - // don't actually do this - copy encoded audio to stdout - if _, err := io.Copy(os.Stdout, encoder) ; err != nil { - panic(err) - } - } - -You can accomplish the same thing using the "configuration" functionality provided by NewPipelineFromConfig(). -Here is an example that will record from a pulse server and make opus/webm data available on the Reader. - - import ( - "io" - "os" - "github.com/tinyzimmer/go-gst-launch/gst" - ) - - func main() { - gst.Init() - encoder, err := gst.NewPipelineFromConfig(&gst.PipelineConfig{ - Plugins: []*gst.Plugin{ - { - Name: "pulsesrc", - Data: map[string]interface{}{ - "server": "/run/user/1000/pulse/native", - "device": "playback-device.monitor", - }, - SinkCaps: gst.NewRawCaps("S16LE", 24000, 2), - }, - { - Name: "opusenc", - }, - { - Name: "webmmux", - }, - }, - }, gst.PipelineRead, nil) - if err != nil { - panic(err) - } - - defer encoder.Close() - - if err := encoder.Start() ; err != nil { - panic(err) - } - - // Create an output file - f, err := os.Create("out.opus") - if err != nil { - panic(err) - } - - // Copy the data from the pipeline to the file - if err := io.Copy(f, encoder) ; err != nil { - panic(err) - } - - } - - -There are two channels exported for listening for messages from the pipeline. -An example of listening to messages on a fake pipeline for 10 seconds: - - package main - - import ( - "fmt" - "time" - - "github.com/tinyzimmer/go-gst-launch/gst" - ) - - func main() { - gst.Init() - - pipeline, err := gst.NewPipelineFromLaunchString("audiotestsrc ! fakesink", gst.PipelineInternalOnly) - if err != nil { - panic(err) - } - - defer pipeline.Close() - - go func() { - for msg := range pipeline.MessageChan() { - fmt.Println("Got message:", msg.TypeName()) - } - }() - - go func() { - for msg := range pipeline.ErrorChan() { - fmt.Println("Got error:", err) - } - }() - - if err := pipeline.Start(); err != nil { - fmt.Println("Pipeline failed to start") - return - } - - time.Sleep(time.Second * 10) - } - - -The package also exposes some low level functionality for building pipelines -and doing dynamic linking yourself. See the NewPipeline() function for creating an -empty pipeline that you can then build out using the other structs and methods provided -by this package. - +Package gst contains bindings for the gstreamer C API. If you are trying +to build simple pipelines quickly (and optiionally readers/writers) see +the gstauto package. */ package gst diff --git a/gst/gst_app_src.go b/gst/gst_app_src.go index 2298428..d911a4d 100644 --- a/gst/gst_app_src.go +++ b/gst/gst_app_src.go @@ -67,18 +67,4 @@ func (a *AppSrc) PushBuffer(data io.Reader) FlowReturn { return FlowReturn(ret) } -// FlowReturn is go type casting for GstFlowReturn. -type FlowReturn C.GstFlowReturn - -// Type casting of the GstFlowReturn types. Custom ones are omitted for now. -const ( - FlowOK FlowReturn = C.GST_FLOW_OK // Data passing was ok - FlowNotLinked = C.GST_FLOW_NOT_LINKED // Pad is not linked - FlowFlushing = C.GST_FLOW_FLUSHING // Pad is flushing - FlowEOS = C.GST_FLOW_EOS // Pad is EOS - FlowNotNegotiated = C.GST_FLOW_NOT_NEGOTIATED // Pad is not negotiated - FlowError = C.GST_FLOW_ERROR // Some (fatal) error occurred - FlowNotSupported = C.GST_FLOW_NOT_SUPPORTED // The operation is not supported. -) - func wrapAppSrc(elem *Element) *AppSrc { return &AppSrc{elem} } diff --git a/gst/gst_constants.go b/gst/gst_constants.go index 0d63eef..8208fbd 100644 --- a/gst/gst_constants.go +++ b/gst/gst_constants.go @@ -196,3 +196,17 @@ const ( MiniObjectFlagMayBeLeaked MiniObjectFlags = C.GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED // (4) – the object is expected to stay alive even after gst_deinit has been called and so should be ignored by leak detection tools. (Since: 1.10) MiniObjectFlagLast MiniObjectFlags = C.GST_MINI_OBJECT_FLAG_LAST // (16) – first flag that can be used by subclasses. ) + +// FlowReturn is go type casting for GstFlowReturn. +type FlowReturn int + +// Type casting of the GstFlowReturn types. Custom ones are omitted for now. +const ( + FlowOK FlowReturn = C.GST_FLOW_OK // Data passing was ok + FlowNotLinked FlowReturn = C.GST_FLOW_NOT_LINKED // Pad is not linked + FlowFlushing FlowReturn = C.GST_FLOW_FLUSHING // Pad is flushing + FlowEOS FlowReturn = C.GST_FLOW_EOS // Pad is EOS + FlowNotNegotiated FlowReturn = C.GST_FLOW_NOT_NEGOTIATED // Pad is not negotiated + FlowError FlowReturn = C.GST_FLOW_ERROR // Some (fatal) error occurred + FlowNotSupported FlowReturn = C.GST_FLOW_NOT_SUPPORTED // The operation is not supported. +) diff --git a/gst/gst_mini_object.go b/gst/gst_mini_object.go index f40ac88..c6ceb43 100644 --- a/gst/gst_mini_object.go +++ b/gst/gst_mini_object.go @@ -39,8 +39,11 @@ func NewMiniObject(flags MiniObjectFlags, gtype glib.Type) *MiniObject { // native returns the pointer to the underlying object. func (m *MiniObject) unsafe() unsafe.Pointer { return m.ptr } +// Parent returns the parent of this MiniObject +func (m *MiniObject) Parent() *MiniObject { return m.parent } + // Instance returns the native GstMiniObject instance. -func (m *MiniObject) Instance() *C.GstMiniObject { return C.toGstMiniObject(m.ptr) } +func (m *MiniObject) Instance() *C.GstMiniObject { return C.toGstMiniObject(m.unsafe()) } // Ref increases the ref count on this object by one. func (m *MiniObject) Ref() { C.gst_mini_object_ref(m.Instance()) } diff --git a/gst/g_parameter_spec.go b/gst/gst_parameter_spec.go similarity index 100% rename from gst/g_parameter_spec.go rename to gst/gst_parameter_spec.go diff --git a/gst/gst_pipeline.go b/gst/gst_pipeline.go index b9f6a8c..b50385e 100644 --- a/gst/gst_pipeline.go +++ b/gst/gst_pipeline.go @@ -9,39 +9,14 @@ package gst import "C" import ( - "bufio" "errors" "fmt" - "io" - "os" "strings" "unsafe" "github.com/gotk3/gotk3/glib" ) -// PipelineFlags represents arguments passed to a new Pipeline. -type PipelineFlags int - -const ( - // PipelineInternalOnly signals that this pipeline only handles data internally. - PipelineInternalOnly PipelineFlags = 1 << iota - // PipelineRead signals that the Read() method can be used on the end of this pipeline. - PipelineRead - // PipelineWrite signals that the Write() method can be used on the start of this pipeline. - PipelineWrite - // PipelineUseGstApp signals the desire to use an AppSink or AppSrc instead of the default - // os pipes, fdsrc, and fdsink. - // When using this flag, you should interact with the pipeline using the GetAppSink and - // GetAppSrc methods. - PipelineUseGstApp - // PipelineReadWrite signals that this pipeline can be both read and written to. - PipelineReadWrite = PipelineRead | PipelineWrite -) - -// has returns true if these flags contain the given flag. -func (p PipelineFlags) has(b PipelineFlags) bool { return p&b != 0 } - // Pipeline is a go implementation of a GstPipeline. Helper methods are provided for constructing // pipelines either using file descriptors or the Appsrc/Appsink APIs. The struct itself implements // a ReadWriteCloser. @@ -51,47 +26,25 @@ type Pipeline struct { // a local reference to the bus so duplicates aren't created // when retrieved by the user bus *Bus - - // The buffers backing the Read and Write methods - destBuf *bufio.Reader - srcBuf *bufio.Writer - - // used with PipelineWrite - srcReader, srcWriter *os.File - // used with PipelineRead - destReader, destWriter *os.File - - // used with PipelineWrite AND PipelineGstApp - appSrc *AppSrc - // used with PipelineRead AND PipelineGstApp - appSink *AppSink - autoFlush bool // when set to true, the contents of the app sink are automatically flushed to the read buffer. - - // The element that represents the source/dest pipeline - // and any caps to apply to it. - srcElement *Element - srcCaps *Caps - destElement *Element - - // whether or not the pipeline was built from a string. this is checked when - // starting to see who is responsible for build and linking the buffers. - pipelineFromHelper bool - - // A channel where a caller can listen for errors asynchronously. - errCh chan error - // A channel where a caller can listen for messages - msgCh []chan *Message } -func newEmptyPipeline() (*C.GstPipeline, error) { - pipeline := C.gst_pipeline_new((*C.gchar)(nil)) +// NewPipeline allocates and returns a new empty pipeline. If name is empty, one +// is generated by gstreamer. +func NewPipeline(name string) (*Pipeline, error) { + var cChar *C.char + if name != "" { + cChar = C.CString(name) + defer C.free(unsafe.Pointer(cChar)) + } + pipeline := C.gst_pipeline_new((*C.gchar)(cChar)) if pipeline == nil { return nil, errors.New("Could not create new pipeline") } - return C.toGstPipeline(unsafe.Pointer(pipeline)), nil + return wrapPipeline(glib.Take(unsafe.Pointer(pipeline))), nil } -func newPipelineFromString(launchv string) (*C.GstPipeline, error) { +// NewPipelineFromString creates a new gstreamer pipeline from the given launch string. +func NewPipelineFromString(launchv string) (*Pipeline, error) { if len(strings.Split(launchv, "!")) < 2 { return nil, fmt.Errorf("Given string is too short for a pipeline: %s", launchv) } @@ -104,143 +57,12 @@ func newPipelineFromString(launchv string) (*C.GstPipeline, error) { errMsg := C.GoString(gerr.message) return nil, errors.New(errMsg) } - return C.toGstPipeline(unsafe.Pointer(pipeline)), nil -} - -// NewPipeline builds and returns a new empty Pipeline instance. -func NewPipeline(flags PipelineFlags) (*Pipeline, error) { - pipelineElement, err := newEmptyPipeline() - if err != nil { - return nil, err - } - - pipeline := wrapPipeline(glib.Take(unsafe.Pointer(pipelineElement))) - - if err := applyFlags(pipeline, flags); err != nil { - return nil, err - } - - return pipeline, nil -} - -func applyFlags(pipeline *Pipeline, flags PipelineFlags) error { - // If the user wants to be able to write to the pipeline, set up the - // write-buffers - if flags.has(PipelineWrite) { - // Set up a pipe - if err := pipeline.setupWriters(); err != nil { - return err - } - } - - // If the user wants to be able to read from the pipeline, setup the - // read-buffers. - if flags.has(PipelineRead) { - if err := pipeline.setupReaders(); err != nil { - return err - } - } - - return nil -} - -func (p *Pipeline) setupWriters() error { - var err error - p.srcReader, p.srcWriter, err = os.Pipe() - if err != nil { - return err - } - p.srcBuf = bufio.NewWriter(p.srcWriter) - return nil -} - -func (p *Pipeline) setupReaders() error { - var err error - p.destReader, p.destWriter, err = os.Pipe() - if err != nil { - return err - } - p.destBuf = bufio.NewReader(p.destReader) - return nil + return wrapPipeline(glib.Take(unsafe.Pointer(pipeline))), nil } // Instance returns the native GstPipeline instance. func (p *Pipeline) Instance() *C.GstPipeline { return C.toGstPipeline(p.unsafe()) } -// Read implements a Reader and returns data from the read buffer. -func (p *Pipeline) Read(b []byte) (int, error) { - if p.destBuf == nil { - return 0, io.ErrClosedPipe - } - return p.destBuf.Read(b) -} - -// readerFd returns the file descriptor for the read buffer, or 0 if -// there isn't one. It returns the file descriptor that can be written to -// by gstreamer. -func (p *Pipeline) readerFd() uintptr { - if p.destWriter == nil { - return 0 - } - return p.destWriter.Fd() -} - -// Write implements a Writer and places data in the write buffer. -func (p *Pipeline) Write(b []byte) (int, error) { - if p.srcBuf == nil { - return 0, io.ErrClosedPipe - } - return p.srcBuf.Write(b) -} - -// writerFd returns the file descriptor for the write buffer, or 0 if -// there isn't one. It returns the file descriptor that can be read from -// by gstreamer. -func (p *Pipeline) writerFd() uintptr { - if p.srcWriter == nil { - return 0 - } - return p.srcReader.Fd() -} - -// SetWriterCaps sets the caps on the write-buffer. You will usually want to call this -// on a custom pipeline, unless you are using downstream elements that do dynamic pad -// linking. -func (p *Pipeline) SetWriterCaps(caps *Caps) { p.srcCaps = caps } - -// LinkWriterTo links the write buffer on this Pipeline to the given element. This must -// be called when the pipeline is constructed with PipelineWrite or PipelineReadWrite. -func (p *Pipeline) LinkWriterTo(elem *Element) { p.srcElement = elem } - -// LinkReaderTo links the read buffer on this Pipeline to the given element. This must -// be called when the pipeline is constructed with PipelineRead or PipelineReadWrite. -func (p *Pipeline) LinkReaderTo(elem *Element) { p.destElement = elem } - -// IsUsingGstApp returns true if the current pipeline is using GstApp instead of file descriptors. -func (p *Pipeline) IsUsingGstApp() bool { - return p.appSrc != nil || p.appSink != nil -} - -// GetAppSrc returns the AppSrc for this pipeline if created with PipelineUseGstApp. -// Unref after usage. -func (p *Pipeline) GetAppSrc() *AppSrc { - if p.appSrc == nil { - return nil - } - // increases the ref count on the element - return wrapAppSrc(p.appSrc.Element) -} - -// GetAppSink returns the AppSink for this pipeline if created with PipelineUseGstApp. -// Unref after usage. -func (p *Pipeline) GetAppSink() *AppSink { - if p.appSink == nil { - return nil - } - // increases the ref count - return wrapAppSink(p.appSink.Element) -} - // GetBus returns the message bus for this pipeline. func (p *Pipeline) GetBus() *Bus { if p.bus == nil { @@ -250,245 +72,23 @@ func (p *Pipeline) GetBus() *Bus { return p.bus } -// SetAutoFlush sets whether or not samples should be automatically flushed to the read-buffer -// (default for pipelines not built with PipelineUseGstApp) and if messages should be flushed -// on the bus when the pipeline is stopped. -func (p *Pipeline) SetAutoFlush(b bool) { - p.Set("auto-flush-bus", b) - p.autoFlush = b -} - -// AutoFlush returns true if the pipeline is using a GstAppSink and is configured to autoflush to the -// read-buffer. -func (p *Pipeline) AutoFlush() bool { return p.IsUsingGstApp() && p.autoFlush } - -// Flush flushes the app sink to the read buffer. It is usually more desirable to interface -// with the PullSample and BlockPullSample methods on the AppSink interface directly. Or -// to set autoflush to true. -func (p *Pipeline) Flush() error { - sample, err := p.appSink.PullSample() - if err != nil { // err signals end of stream - return err - } - if sample == nil { - return nil - } - defer sample.Unref() - if _, err := io.Copy(p.destWriter, sample.GetBuffer()); err != nil { - return err - } - return nil -} - -// BlockFlush is like Flush but it blocks until a sample is available. This is intended for -// use with PipelineUseGstApp. -func (p *Pipeline) BlockFlush() error { - sample, err := p.appSink.BlockPullSample() - if err != nil { // err signals end of stream - return err - } - if sample == nil { - return nil - } - defer sample.Unref() - if _, err := io.Copy(p.destWriter, sample.GetBuffer()); err != nil { - return err - } - return nil -} - -// setupSrc sets up a source element with the given configuration. -func (p *Pipeline) setupSrc(pluginName string, args map[string]interface{}) (*Element, error) { - elem, err := NewElement(pluginName) - if err != nil { - return nil, err - } - for k, v := range args { - if err := elem.Set(k, v); err != nil { - return nil, err - } - } - if err := p.Add(elem); err != nil { - return nil, err - } - if p.srcCaps != nil { - return elem, elem.LinkFiltered(p.srcElement, p.srcCaps) - } - return elem, elem.Link(p.srcElement) -} - -// setupFdSrc will setup a fdsrc as the source of the pipeline. -func (p *Pipeline) setupFdSrc() error { - _, err := p.setupSrc("fdsrc", map[string]interface{}{ - "fd": p.writerFd(), - }) - return err -} - -// setupAppSrc sets up an appsrc as the source of the pipeline -func (p *Pipeline) setupAppSrc() error { - appSrc, err := p.setupSrc("appsrc", map[string]interface{}{ - "block": true, // TODO: make this configurable - "emit-signals": false, // https://gstreamer.freedesktop.org/documentation/app/appsrc.html?gi-language=c - }) - if err != nil { - return err - } - p.appSrc = &AppSrc{appSrc} - return nil -} - -// setupSrcElement will setup the source element when the pipeline is constructed with -// PipelineWrite. -func (p *Pipeline) setupSrcElement() error { - if p.srcElement == nil { - return errors.New("Pipeline was constructed with PipelineWrite but LinkWriterTo was never called") - } - if p.IsUsingGstApp() { - return p.setupAppSrc() - } - return p.setupFdSrc() -} - -// setupSink sets up a sink element with the given congifuration. -func (p *Pipeline) setupSink(pluginName string, args map[string]interface{}) (*Element, error) { - elem, err := NewElement(pluginName) - if err != nil { - return nil, err - } - for k, v := range args { - if err := elem.Set(k, v); err != nil { - return nil, err - } - } - if err := p.Add(elem); err != nil { - return nil, err - } - return elem, p.destElement.Link(elem) -} - -// setupFdSink sets up a fdsink as the sink of the pipeline. -func (p *Pipeline) setupFdSink() error { - _, err := p.setupSink("fdsink", map[string]interface{}{ - "fd": p.readerFd(), - }) - return err -} - -// setupAppSink sets up an appsink as the sink of the pipeline. -func (p *Pipeline) setupAppSink() error { - appSink, err := p.setupSink("appsink", map[string]interface{}{ - "emit-signals": false, - }) - if err != nil { - return err - } - p.appSink = wrapAppSink(appSink) - return nil -} - -// setupDestElement will setup the destination (sink) element when the pipeline is constructed with -// PipelineRead. -func (p *Pipeline) setupDestElement() error { - if p.destElement == nil { - return errors.New("Pipeline was constructed with PipelineRead but LinkReaderTo was never called") - } - if p.IsUsingGstApp() { - return p.setupAppSink() - } - return p.setupFdSink() -} - // Start will start the GstPipeline. It is asynchronous so it does not need to be // called within a goroutine, however, it is still safe to do so. func (p *Pipeline) Start() error { - // If there is a write buffer on this pipeline, set up an fdsrc - if p.srcBuf != nil && !p.pipelineFromHelper { - if err := p.setupSrcElement(); err != nil { - return err - } - } - - // If there is a read buffer on this pipeline, set up an fdsink - if p.destBuf != nil && !p.pipelineFromHelper { - if err := p.setupDestElement(); err != nil { - return err - } - } - - return p.startPipeline() + return p.SetState(StatePlaying) } -func (p *Pipeline) closeBuffers() error { - if p.srcBuf != nil && p.srcReader != nil && p.srcWriter != nil { - if err := p.srcReader.Close(); err != nil { - return err - } - if err := p.srcWriter.Close(); err != nil { - return err - } - p.srcBuf = nil - } - if p.destBuf != nil && p.destReader != nil && p.destWriter != nil { - if err := p.destReader.Close(); err != nil { - return err - } - if err := p.destWriter.Close(); err != nil { - return err - } - p.destBuf = nil +// Destroy will attempt to stop the pipeline and then unref once the stream has +// fully completed. +func (p *Pipeline) Destroy() error { + if err := p.BlockSetState(StateNull); err != nil { + return err } + p.Unref() return nil } -// ReadBufferSize returns the current size of the unread portion of the read-buffer. -func (p *Pipeline) ReadBufferSize() int { - if p.destBuf == nil { - return 0 - } - return p.destBuf.Buffered() -} - -// WriteBufferSize returns the current size of the unread portion of the write-buffer. -func (p *Pipeline) WriteBufferSize() int { - if p.srcBuf == nil { - return 0 - } - return p.srcBuf.Buffered() -} - -// TotalBufferSize returns the sum of the Read and Write buffer unread portions. -func (p *Pipeline) TotalBufferSize() int { return p.WriteBufferSize() + p.ReadBufferSize() } - -// Close implements a Closer and closes all buffers. -func (p *Pipeline) Close() error { - defer p.Unref() - if err := p.closeBuffers(); err != nil { - return err - } - return p.SetState(StateNull) -} - -// startPipeline will set the GstPipeline to the PLAYING state. -func (p *Pipeline) startPipeline() error { - if err := p.SetState(StatePlaying); err != nil { - return err - } - // If using GstApp with autoflush - if p.AutoFlush() { - go func() { - for { - if err := p.BlockFlush(); err != nil { - // err signals end of stream - return - } - } - }() - } - return nil -} - -// Wait waits for the given pipeline to reach end of stream. +// Wait waits for the given pipeline to reach end of stream or be stopped. func Wait(p *Pipeline) { if p.Instance() == nil { return diff --git a/gst/gst_pipeline_config.go b/gst/gst_pipeline_config.go deleted file mode 100644 index 54e7425..0000000 --- a/gst/gst_pipeline_config.go +++ /dev/null @@ -1,194 +0,0 @@ -package gst - -import "fmt" - -// PipelineConfig represents a list of elements and their configurations -// to be used with NewPipelineFromConfig. -type PipelineConfig struct { - Elements []*PipelineElement -} - -// GetElementByName returns the Element configuration for the given name. -func (p *PipelineConfig) GetElementByName(name string) *PipelineElement { - for _, elem := range p.Elements { - if name == elem.GetName() { - return elem - } - } - return nil -} - -// ElementNames returns a string slice of the names of all the plugins. -func (p *PipelineConfig) ElementNames() []string { - names := make([]string, 0) - for _, elem := range p.Elements { - names = append(names, elem.GetName()) - } - return names -} - -// pushPluginToTop pushes a plugin to the top of the list. -func (p *PipelineConfig) pushPluginToTop(elem *PipelineElement) { - newSlc := []*PipelineElement{elem} - newSlc = append(newSlc, p.Elements...) - p.Elements = newSlc -} - -// PipelineElement represents an `GstElement` in a `GstPipeline` when building a Pipeline with `NewPipelineFromConfig`. -// The Name should coorespond to a valid gstreamer plugin name. The data are additional -// fields to set on the element. If SinkCaps is non-nil, they are applied to the sink of this -// element. -type PipelineElement struct { - Name string - SinkCaps *Caps - Data map[string]interface{} -} - -// GetName returns the name to use when creating Elements from this configuration. -func (p *PipelineElement) GetName() string { return p.Name } - -// NewPipelineFromConfig builds a new pipeline from the given PipelineConfig. The plugins provided -// in the configuration will be linked in the order they are given. -// If using PipelineWrite, you can optionally pass a Caps object to filter between the write-buffer -// and the start of the pipeline. -func NewPipelineFromConfig(cfg *PipelineConfig, flags PipelineFlags, caps *Caps) (pipeline *Pipeline, err error) { - // create a new empty pipeline instance - pipeline, err = NewPipeline(flags) - if err != nil { - return nil, err - } - // if any error happens while setting up the pipeline, immediately free it - defer func() { - if err != nil { - if cerr := pipeline.Close(); cerr != nil { - fmt.Println("Failed to close pipeline:", err) - } - } - }() - - if cfg.Elements == nil { - cfg.Elements = make([]*PipelineElement, 0) - } - - if flags.has(PipelineWrite) { - if flags.has(PipelineUseGstApp) { - cfg.pushPluginToTop(&PipelineElement{ - Name: "appsrc", - Data: map[string]interface{}{ - "block": true, // TODO: make these all configurable - "emit-signals": false, // https://gstreamer.freedesktop.org/documentation/app/appsrc.html?gi-language=c - "is-live": true, - "max-bytes": 200000, - // "size": 0, // If this is known we should specify it - }, - SinkCaps: caps, - }) - } else { - cfg.pushPluginToTop(&PipelineElement{ - Name: "fdsrc", - Data: map[string]interface{}{ - "fd": pipeline.writerFd(), - }, - SinkCaps: caps, - }) - } - } - - if flags.has(PipelineRead) { - if flags.has(PipelineUseGstApp) { - cfg.Elements = append(cfg.Elements, &PipelineElement{ - Name: "appsink", - Data: map[string]interface{}{ - "emit-signals": false, - }, - }) - } else { - cfg.Elements = append(cfg.Elements, &PipelineElement{ - Name: "fdsink", - Data: map[string]interface{}{ - "fd": pipeline.readerFd(), - }, - }) - } - } - - // retrieve a list of the plugin names - pluginNames := cfg.ElementNames() - - // build all the elements - var elements map[int]*Element - elements, err = NewElementMany(pluginNames...) - if err != nil { - return - } - - // iterate the plugin names and add them to the pipeline - for idx, name := range pluginNames { - // get the current plugin and element - currentPlugin := cfg.GetElementByName(name) - currentElem := elements[idx] - - // Iterate any data with the plugin and set it on the element - for key, value := range currentPlugin.Data { - if err = currentElem.Set(key, value); err != nil { - return - } - } - - // Add the element to the pipeline - if err = pipeline.Add(currentElem); err != nil { - return - } - - // If this is the first element continue - if idx == 0 { - continue - } - - // get the last element in the chain - lastPluginName := pluginNames[idx-1] - lastElem := elements[idx-1] - lastPlugin := cfg.GetElementByName(lastPluginName) - - if lastPlugin == nil { - // this should never happen, since only used internally, - // but safety from panic - continue - } - - // If this is the second element and we are configuring writing - // call link on the last element - if idx == 1 && flags.has(PipelineWrite) { - pipeline.LinkWriterTo(lastElem) - if flags.has(PipelineUseGstApp) { - pipeline.appSrc = wrapAppSrc(lastElem) - } - } - - // If this is the last element and we are configuring reading - // call link on the element - if idx == len(pluginNames)-1 && flags.has(PipelineRead) { - pipeline.LinkReaderTo(currentElem) - if flags.has(PipelineUseGstApp) { - pipeline.appSink = wrapAppSink(currentElem) - } - } - - // If there are sink caps on the last element, do a filtered link to this one and continue - if lastPlugin.SinkCaps != nil { - if err = lastElem.LinkFiltered(currentElem, lastPlugin.SinkCaps); err != nil { - return - } - continue - } - - // link the last element to this element - if err = lastElem.Link(currentElem); err != nil { - return - } - } - - pipeline.pipelineFromHelper = true - - return -} diff --git a/gst/gst_pipeline_string.go b/gst/gst_pipeline_string.go deleted file mode 100644 index c183409..0000000 --- a/gst/gst_pipeline_string.go +++ /dev/null @@ -1,145 +0,0 @@ -package gst - -import ( - "errors" - "fmt" - "strings" - "unsafe" - - "github.com/gotk3/gotk3/glib" -) - -// NewPipelineFromLaunchString returns a new GstPipeline from the given launch string. If flags -// contain PipelineRead or PipelineWrite, the launch string is further formatted accordingly. -// -// If using PipelineWrite, you should generally start your pipeline with the caps of the source. -func NewPipelineFromLaunchString(launchStr string, flags PipelineFlags) (*Pipeline, error) { - // reformat the string to point at the writerFd - if flags.has(PipelineWrite) { - - if flags.has(PipelineUseGstApp) { - if launchStr == "" { - launchStr = "appsrc" - } else { - launchStr = fmt.Sprintf("appsrc ! %s", launchStr) - } - } else { - if launchStr == "" { - launchStr = "fdsrc" - } else { - launchStr = fmt.Sprintf("fdsrc ! %s", launchStr) - } - } - - } - - if flags.has(PipelineRead) { - - if flags.has(PipelineUseGstApp) { - if launchStr == "" { - launchStr = "appsink emit-signals=false" - } else { - launchStr = fmt.Sprintf("%s ! appsink emit-signals=false", launchStr) - } - } else { - if launchStr == "" { - launchStr = "fdsink" - } else { - launchStr = fmt.Sprintf("%s ! fdsink", launchStr) - } - } - - } - - pipelineElement, err := newPipelineFromString(launchStr) - if err != nil { - return nil, err - } - - pipeline := wrapPipeline(glib.Take(unsafe.Pointer(pipelineElement))) - - if err := applyFlags(pipeline, flags); err != nil { - return nil, err - } - - if flags.has(PipelineWrite) { - - sources, err := pipeline.GetSourceElements() - if err != nil { - return nil, err - } - - var srcType string - if flags.has(PipelineUseGstApp) { - srcType = "appsrc" - } else { - srcType = "fdsrc" - } - - var pipelineSrc *Element - for _, src := range sources { - if strings.Contains(src.Name(), srcType) { - pipelineSrc = src - } else { - src.Unref() - } - } - - if pipelineSrc == nil { - return nil, errors.New("Could not detect pipeline source") - } - - defer pipelineSrc.Unref() - - if flags.has(PipelineUseGstApp) { - pipeline.appSrc = wrapAppSrc(pipelineSrc) - } else { - if err := pipelineSrc.Set("fd", pipeline.writerFd()); err != nil { - return nil, err - } - } - } - - if flags.has(PipelineRead) { - sinks, err := pipeline.GetSinkElements() - if err != nil { - return nil, err - } - - var sinkType string - if flags.has(PipelineUseGstApp) { - sinkType = "appsink" - } else { - sinkType = "fdsink" - } - - var pipelineSink *Element - for _, sink := range sinks { - if strings.Contains(sink.Name(), sinkType) { - pipelineSink = sink - } else { - sink.Unref() - } - } - - if pipelineSink == nil { - return nil, errors.New("Could not detect pipeline sink") - } - - defer pipelineSink.Unref() - - if flags.has(PipelineUseGstApp) { - pipeline.appSink = wrapAppSink(pipelineSink) - } else { - if err := pipelineSink.Set("fd", pipeline.readerFd()); err != nil { - return nil, err - } - } - - } - - // signal that this pipeline was made from a string and therefore already linked - pipeline.pipelineFromHelper = true - - return pipeline, err -} diff --git a/gst/gst_structure.go b/gst/gst_structure.go index 1bcc0e4..2a671d3 100644 --- a/gst/gst_structure.go +++ b/gst/gst_structure.go @@ -42,9 +42,13 @@ func NewStructureFromString(stStr string) *Structure { return wrapStructure(structure) } -// StructureFromGValue extracts the GstStructure from a glib.Value. +// StructureFromGValue extracts the GstStructure from a glib.Value, or nil +// if one does not exist. func StructureFromGValue(gval *glib.Value) *Structure { st := C.gst_value_get_structure((*C.GValue)(gval.Native())) + if st == nil { + return nil + } return wrapStructure(st) } diff --git a/gst/gstauto/bufio.go b/gst/gstauto/bufio.go new file mode 100644 index 0000000..8842e00 --- /dev/null +++ b/gst/gstauto/bufio.go @@ -0,0 +1,101 @@ +package gstauto + +import ( + "bufio" + "io" + "os" +) + +// Blank assertions to ensure interfaces are implemented. +var _ io.ReadCloser = &readCloser{} +var _ io.WriteCloser = &writeCloser{} +var _ io.ReadWriteCloser = &readWriteCloser{} + +// readCloser is a struct that provides a read buffer that can also be written to +// internally. +type readCloser struct { + rReader, rWriter *os.File + rBuf *bufio.Reader +} + +// newReadCloser returns a new readCloser. +func newReadCloser() (*readCloser, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + return &readCloser{ + rReader: r, + rWriter: w, + rBuf: bufio.NewReader(r), + }, nil +} + +// Read implements a Reader for objects embdedding this struct. +func (r *readCloser) Read(p []byte) (int, error) { return r.rBuf.Read(p) } + +// Close implements a Closer for objects embedding this struct. +func (r *readCloser) Close() error { + if err := r.rWriter.Close(); err != nil { + return err + } + return r.rReader.Close() +} + +// writeCloser is a struct that provides a read buffer that can also be +// read from internally. +type writeCloser struct { + wReader, wWriter *os.File + wBuf *bufio.Writer +} + +// newWriteCloser returns a new writeCloser. +func newWriteCloser() (*writeCloser, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + return &writeCloser{ + wReader: r, + wWriter: w, + wBuf: bufio.NewWriter(w), + }, nil +} + +// Write implements a Writer for objects embedding this struct. +func (w *writeCloser) Write(p []byte) (int, error) { return w.wBuf.Write(p) } + +// Close implements a Closer for objects embedding this struct. +func (w *writeCloser) Close() error { + if err := w.wWriter.Close(); err != nil { + return err + } + return w.wReader.Close() +} + +// readWriteCloser is a struct that provides both read and write buffers. +type readWriteCloser struct { + *readCloser + *writeCloser +} + +// newReadWriteCloser returns a new readWriteCloser. +func newReadWriteCloser() (*readWriteCloser, error) { + rCloser, err := newReadCloser() + if err != nil { + return nil, err + } + wCloser, err := newWriteCloser() + if err != nil { + return nil, err + } + return &readWriteCloser{readCloser: rCloser, writeCloser: wCloser}, nil +} + +// Close implements a Closer for objects embedding this struct. +func (rw *readWriteCloser) Close() error { + if err := rw.writeCloser.Close(); err != nil { + return err + } + return rw.readCloser.Close() +} diff --git a/gst/gstauto/doc.go b/gst/gstauto/doc.go new file mode 100644 index 0000000..c5624f3 --- /dev/null +++ b/gst/gstauto/doc.go @@ -0,0 +1,4 @@ +// Package gstauto contains helper methods and objects for building pipelines that +// satisfy most use cases. It provides an abstraction over the lower-level building +// blocks provided in the gst package. +package gstauto diff --git a/gst/gstauto/pipeline_config.go b/gst/gstauto/pipeline_config.go new file mode 100644 index 0000000..6d07986 --- /dev/null +++ b/gst/gstauto/pipeline_config.go @@ -0,0 +1,141 @@ +package gstauto + +import ( + "errors" + "fmt" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// PipelineElement represents an `GstElement` in a `GstPipeline` when building a Pipeline with `NewPipelineFromConfig`. +// The Name should coorespond to a valid gstreamer plugin name. The data are additional +// fields to set on the element. If SinkCaps is non-nil, they are applied to the sink of this +// element. +type PipelineElement struct { + Name string + SinkCaps *gst.Caps + Data map[string]interface{} +} + +// GetName returns the name to use when creating Elements from this configuration. +func (p *PipelineElement) GetName() string { return p.Name } + +// PipelineConfig represents a list of elements and their configurations +// to be used with NewPipelineFromConfig. +type PipelineConfig struct { + Elements []*PipelineElement +} + +// GetElementByName returns the Element configuration for the given name. +func (p *PipelineConfig) GetElementByName(name string) *PipelineElement { + for _, elem := range p.Elements { + if name == elem.GetName() { + return elem + } + } + return nil +} + +// ElementNames returns a string slice of the names of all the plugins. +func (p *PipelineConfig) ElementNames() []string { + names := make([]string, 0) + for _, elem := range p.Elements { + names = append(names, elem.GetName()) + } + return names +} + +// pushPluginToTop pushes a plugin to the top of the list. +func (p *PipelineConfig) pushPluginToTop(elem *PipelineElement) { + newSlc := []*PipelineElement{elem} + newSlc = append(newSlc, p.Elements...) + p.Elements = newSlc +} + +// Apply applies this configuration to the given Pipeline. +func (p *PipelineConfig) Apply(pipeline *gst.Pipeline) error { + // build all the elements + elementNames := p.ElementNames() + elements, err := gst.NewElementMany(elementNames...) + if err != nil { + return err + } + // iterate the element names and add them to the pipeline + for idx, name := range elementNames { + // get the current config and element + currentCfg := p.GetElementByName(name) + currentElem := elements[idx] + + // Iterate any data with the plugin and set it on the element + for key, value := range currentCfg.Data { + if err := currentElem.Set(key, value); err != nil { + return err + } + } + + // Add the element to the pipeline + if err := pipeline.Add(currentElem); err != nil { + return err + } + + // If this is the first element continue + if idx == 0 { + continue + } + + // get the last element in the chain + lastElemName := elementNames[idx-1] + lastElem := elements[idx-1] + lastCfg := p.GetElementByName(lastElemName) + + if lastCfg == nil { + // this would never happen unless someone is messing with memory, + // but safety from panic + continue + } + + // If there are sink caps on the last element, do a filtered link to this one and continue + if lastCfg.SinkCaps != nil { + if err := lastElem.LinkFiltered(currentElem, lastCfg.SinkCaps); err != nil { + return err + } + continue + } + + // link the last element to this element + if err := lastElem.Link(currentElem); err != nil { + return err + } + } + + return nil +} + +// NewPipelineFromConfig builds a new pipeline from the given PipelineConfig. The plugins provided +// in the configuration will be linked in the order they are given. +func NewPipelineFromConfig(cfg *PipelineConfig) (*gst.Pipeline, error) { + if cfg.Elements == nil { + return nil, errors.New("Element cannot be empty in the configuration") + } + + // create a new empty pipeline instance + pipeline, err := gst.NewPipeline("") + if err != nil { + return nil, err + } + + // if any error happens while setting up the pipeline, immediately free it + defer func() { + if err != nil { + if destroyErr := pipeline.Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", err.Error()) + } + } + }() + + if err = cfg.Apply(pipeline); err != nil { + return nil, err + } + + return pipeline, nil +} diff --git a/gst/gstauto/pipeline_interfaces.go b/gst/gstauto/pipeline_interfaces.go new file mode 100644 index 0000000..fc4b142 --- /dev/null +++ b/gst/gstauto/pipeline_interfaces.go @@ -0,0 +1,32 @@ +package gstauto + +import ( + "io" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// Pipeliner is a the base interface for structs extending the functionality of +// the Pipeline object. It provides a single method which returns the underlying +// Pipeline object. +type Pipeliner interface { + Pipeline() *gst.Pipeline +} + +// ReadPipeliner is a Pipeliner that also implements a ReadCloser. +type ReadPipeliner interface { + Pipeliner + io.ReadCloser +} + +// WritePipeliner is a Pipeliner that also implements a WriteCloser. +type WritePipeliner interface { + Pipeliner + io.WriteCloser +} + +// ReadWritePipeliner is a Pipeliner that also implements a ReadWriteCloser. +type ReadWritePipeliner interface { + ReadPipeliner + WritePipeliner +} diff --git a/gst/gstauto/pipeline_reader.go b/gst/gstauto/pipeline_reader.go new file mode 100644 index 0000000..2415bd5 --- /dev/null +++ b/gst/gstauto/pipeline_reader.go @@ -0,0 +1,79 @@ +package gstauto + +import ( + "fmt" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// Empty assignment to ensure PipelineReader satisfies the ReadPipeliner interface. +var _ ReadPipeliner = &PipelineReader{} + +// PipelineReader is the base struct to be used to implement ReadPipeliners. +type PipelineReader struct { + *readCloser + pipeline *gst.Pipeline +} + +// NewPipelineReader returns a new PipelineReader with an empty pipeline. Use an empty name +// to have gstreamer auto-generate one. This method is intended for use in the construction +// of other interfaces. +func NewPipelineReader(name string) (*PipelineReader, error) { + pipeline, err := gst.NewPipeline(name) + if err != nil { + return nil, err + } + rCloser, err := newReadCloser() + if err != nil { + if closeErr := pipeline.Destroy(); closeErr != nil { + fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error()) + } + return nil, err + } + return &PipelineReader{ + readCloser: rCloser, + pipeline: pipeline, + }, nil +} + +// NewPipelineReaderFromString returns a new PipelineReader with a pipeline populated +// by the provided gstreamer launch string. If you are looking to build a simple +// ReadPipeliner you probably want to use NewPipelineReaderSimpleFromString. +func NewPipelineReaderFromString(launchStr string) (*PipelineReader, error) { + pipeline, err := gst.NewPipelineFromString(launchStr) + if err != nil { + return nil, err + } + rCloser, err := newReadCloser() + if err != nil { + if closeErr := pipeline.Destroy(); closeErr != nil { + fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error()) + } + return nil, err + } + return &PipelineReader{ + readCloser: rCloser, + pipeline: pipeline, + }, nil +} + +// Pipeline returns the underlying Pipeline instance for this pipeliner. It implements the +// Pipeliner interface. +func (r *PipelineReader) Pipeline() *gst.Pipeline { return r.pipeline } + +// ReaderFd returns the file descriptor that can be written to for the read-buffer. This value +// is used when wanting to allow an underlying pipeline to write to the internal buffer (e.g. when using a fdsink). +func (r *PipelineReader) ReaderFd() uintptr { return r.readCloser.rWriter.Fd() } + +// Close will stop and unref the underlying pipeline. +func (r *PipelineReader) Close() error { + if err := r.Pipeline().Destroy(); err != nil { + return err + } + return r.readCloser.Close() +} + +// CloseAsync will close the underlying pipeline asynchronously. It is the caller's +// responsibility to call Unref on the pipeline and close buffers once it is no longer being used. +// This can be accomplished via calling a regular Close (which is idempotent). +func (r *PipelineReader) CloseAsync() error { return r.pipeline.SetState(gst.StateNull) } diff --git a/gst/gstauto/pipeline_readwriter.go b/gst/gstauto/pipeline_readwriter.go new file mode 100644 index 0000000..7a361b3 --- /dev/null +++ b/gst/gstauto/pipeline_readwriter.go @@ -0,0 +1,85 @@ +package gstauto + +import ( + "fmt" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// Empty assignment to ensure PipelineReadWriter satisfies the ReadWritePipeliner interface. +var _ ReadWritePipeliner = &PipelineReadWriter{} + +// PipelineReadWriter is the base struct to be used to implement ReadWritePipeliners. +type PipelineReadWriter struct { + *readWriteCloser + pipeline *gst.Pipeline +} + +// NewPipelineReadWriter returns a new PipelineReadWriter with an empty pipeline. Use an empty name +// to have gstreamer auto-generate one. This method is intended for use in the construction +// of other interfaces. +func NewPipelineReadWriter(name string) (*PipelineReadWriter, error) { + pipeline, err := gst.NewPipeline(name) + if err != nil { + return nil, err + } + rwCloser, err := newReadWriteCloser() + if err != nil { + if closeErr := pipeline.Destroy(); closeErr != nil { + fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error()) + } + return nil, err + } + return &PipelineReadWriter{ + readWriteCloser: rwCloser, + pipeline: pipeline, + }, nil +} + +// NewPipelineReadWriterFromString returns a new PipelineReadWriter with a pipeline populated +// by the provided gstreamer launch string. If you are looking to build a simple +// ReadWritePipeliner you probably want to use NewPipelineReadWriterSimpleFromString. +func NewPipelineReadWriterFromString(launchStr string) (*PipelineReadWriter, error) { + pipeline, err := gst.NewPipelineFromString(launchStr) + if err != nil { + return nil, err + } + rwCloser, err := newReadWriteCloser() + if err != nil { + if closeErr := pipeline.Destroy(); closeErr != nil { + fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error()) + } + return nil, err + } + return &PipelineReadWriter{ + readWriteCloser: rwCloser, + pipeline: pipeline, + }, nil +} + +// Pipeline returns the underlying Pipeline instance for this pipeliner. It implements the +// Pipeliner interface. +func (rw *PipelineReadWriter) Pipeline() *gst.Pipeline { return rw.pipeline } + +// ReaderFd returns the file descriptor that can be written to for the read-buffer. This value +// is used when wanting to allow an underlying pipeline to write to the internal buffer +// (e.g. when using a fdsink). +func (rw *PipelineReadWriter) ReaderFd() uintptr { return rw.readWriteCloser.readCloser.rWriter.Fd() } + +// WriterFd returns the file descriptor that can be used to read from the write-buffer. This value +// is used when wanting to allow an underlying pipeline the ability to read data written to the buffer +// (e.g. when using a fdsrc). +func (rw *PipelineReadWriter) WriterFd() uintptr { return rw.readWriteCloser.writeCloser.wReader.Fd() } + +// Close will stop and unref the underlying pipeline and read/write buffers. +func (rw *PipelineReadWriter) Close() error { + if err := rw.Pipeline().Destroy(); err != nil { + return err + } + return rw.readWriteCloser.Close() +} + +// CloseAsync will close the underlying pipeline asynchronously. It is the caller's +// responsibility to call Unref on the pipeline and close buffers once it is no longer being used. +// This can be accomplished via calling a regular Close (which is idempotent). +func (rw *PipelineReadWriter) CloseAsync() error { return rw.pipeline.SetState(gst.StateNull) } diff --git a/gst/gstauto/pipeline_simple.go b/gst/gstauto/pipeline_simple.go new file mode 100644 index 0000000..fef303d --- /dev/null +++ b/gst/gstauto/pipeline_simple.go @@ -0,0 +1,57 @@ +package gstauto + +import ( + "fmt" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// Blank assignment to make sure PipelinerSimple satisfies the Pipeliner interface. +var _ Pipeliner = &PipelinerSimple{} + +// PipelinerSimple is a simple struct that implements the Pipeliner interface. +// It doesn't provide any additional read/write capabilities. Its primary intention +// is for pipelines where the caller does not wish to personally read or write from +// either end of the buffer. +type PipelinerSimple struct { + pipeline *gst.Pipeline +} + +// Pipeline implements the Pipeliner interface. +func (s *PipelinerSimple) Pipeline() *gst.Pipeline { return s.pipeline } + +// NewPipelinerSimple returns a new empty PipelinerSimple. Pass an empty string +// for name to use an auto-generated one. +func NewPipelinerSimple(name string) (*PipelinerSimple, error) { + pipeline, err := gst.NewPipeline(name) + if err != nil { + return nil, err + } + return &PipelinerSimple{pipeline: pipeline}, nil +} + +// NewPipelinerSimpleFromString returns a new PipelinerSimpler from the given +// launch string. +func NewPipelinerSimpleFromString(launchStr string) (*PipelinerSimple, error) { + pipeline, err := gst.NewPipelineFromString(launchStr) + if err != nil { + return nil, err + } + return &PipelinerSimple{pipeline: pipeline}, nil +} + +// NewPipelinerSimpleFromConfig returns a new PipelinerSimple from the given +// PipelineConfig. +func NewPipelinerSimpleFromConfig(cfg *PipelineConfig) (*PipelinerSimple, error) { + pipeline, err := gst.NewPipeline("") + if err != nil { + return nil, err + } + if err := cfg.Apply(pipeline); err != nil { + if destroyErr := pipeline.Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + return nil, err + } + return &PipelinerSimple{pipeline: pipeline}, nil +} diff --git a/gst/gstauto/pipeline_simple_reader.go b/gst/gstauto/pipeline_simple_reader.go new file mode 100644 index 0000000..5139c91 --- /dev/null +++ b/gst/gstauto/pipeline_simple_reader.go @@ -0,0 +1,85 @@ +package gstauto + +import ( + "errors" + "fmt" + "strings" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// PipelineReaderSimple implements a ReadPipeliner that configures gstreamer +// to write directly to the internal read-buffer via an fdsink. +type PipelineReaderSimple struct { + *PipelineReader +} + +// NewPipelineReaderSimpleFromString returns a new PipelineReaderSimple populated from +// the given launch string. An fdsink is added to the end of the launch string and tied +// to the read buffer. +func NewPipelineReaderSimpleFromString(launchStr string) (*PipelineReaderSimple, error) { + pipelineReader, err := NewPipelineReaderFromString(addFdSinkToStr(launchStr)) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + if destroyErr := pipelineReader.Pipeline().Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + } + }() + + // Retrieve the sinks in the pipeline, most of the time there is just one + var sinks []*gst.Element + sinks, err = pipelineReader.Pipeline().GetSinkElements() + if err != nil { + return nil, err + } + + // Fetch the fdsink and reconfigure it to point to the read buffer. + for _, sink := range sinks { + if strings.Contains(sink.Name(), "fdsink") { + if err = sink.Set("fd", pipelineReader.ReaderFd()); err != nil { + return nil, err + } + } + } + + // Return the pipeline + return &PipelineReaderSimple{pipelineReader}, nil +} + +// NewPipelineReaderSimpleFromConfig returns a new PipelineReaderSimple populated from +// the given launch config. An fdsink is added to the end of the launch config and tied +// to the read buffer. +func NewPipelineReaderSimpleFromConfig(cfg *PipelineConfig) (*PipelineReaderSimple, error) { + if cfg.Elements == nil { + return nil, errors.New("Elements cannot be nil in the config") + } + pipelineReader, err := NewPipelineReader("") + if err != nil { + return nil, err + } + cfg.Elements = append(cfg.Elements, &PipelineElement{ + Name: "fdsink", + Data: map[string]interface{}{ + "fd": pipelineReader.ReaderFd(), + }, + }) + if err := cfg.Apply(pipelineReader.Pipeline()); err != nil { + if destroyErr := pipelineReader.Pipeline().Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + return nil, err + } + return &PipelineReaderSimple{pipelineReader}, nil +} + +func addFdSinkToStr(pstr string) string { + if pstr == "" { + return "fdsink" + } + return fmt.Sprintf("%s ! fdsink", pstr) +} diff --git a/gst/gstauto/pipeline_simple_readwriter.go b/gst/gstauto/pipeline_simple_readwriter.go new file mode 100644 index 0000000..f476a9d --- /dev/null +++ b/gst/gstauto/pipeline_simple_readwriter.go @@ -0,0 +1,101 @@ +package gstauto + +import ( + "errors" + "fmt" + "strings" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// PipelineReadWriterSimple implements a ReadWritePipeliner that configures gstreamer +// to read from the internal write-buffer via an fdsrc and write to the internal read-buffer +// via an fdsink. +type PipelineReadWriterSimple struct { + *PipelineReadWriter +} + +// NewPipelineReadWriterSimpleFromString returns a new PipelineReadWriterSimple from +// the given launch string. An fdsrc listening on the write buffer and an fdsink to the read buffer +// are formatted into the provided string. +func NewPipelineReadWriterSimpleFromString(launchStr string) (*PipelineReadWriterSimple, error) { + pipelineReadWriter, err := NewPipelineReadWriterFromString(addFdSrcToStr(addFdSinkToStr(launchStr))) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + if destroyErr := pipelineReadWriter.Pipeline().Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + } + }() + + // Retrieve the sinks in the pipeline, most of the time there is just one + var sinks []*gst.Element + sinks, err = pipelineReadWriter.Pipeline().GetSinkElements() + if err != nil { + return nil, err + } + + // Fetch the fdsink and reconfigure it to point to the read buffer. + for _, sink := range sinks { + if strings.Contains(sink.Name(), "fdsink") { + if err = sink.Set("fd", pipelineReadWriter.ReaderFd()); err != nil { + return nil, err + } + } + } + + // Retrieve the sources in the pipeline, most of the time there is just one + var sources []*gst.Element + sources, err = pipelineReadWriter.Pipeline().GetSourceElements() + if err != nil { + return nil, err + } + + // Fetch the fdsrc and reconfigure it to point to the write buffer. + for _, source := range sources { + if strings.Contains(source.Name(), "fdsrc") { + if err = source.Set("fd", pipelineReadWriter.WriterFd()); err != nil { + return nil, err + } + } + } + + // Return the pipeline + return &PipelineReadWriterSimple{pipelineReadWriter}, nil +} + +// NewPipelineReadWriterSimpleFromConfig returns a new PipelineReadWriterSimple populated from +// the given launch config. An fdsrc is added to the start of the launch config and tied +// to the write buffer, and an fdsink is added to the end tied to the read-buffer. +func NewPipelineReadWriterSimpleFromConfig(cfg *PipelineConfig) (*PipelineReadWriterSimple, error) { + if cfg.Elements == nil { + return nil, errors.New("Elements cannot be nil in the config") + } + pipelineReadWriter, err := NewPipelineReadWriter("") + if err != nil { + return nil, err + } + cfg.pushPluginToTop(&PipelineElement{ + Name: "fdsrc", + Data: map[string]interface{}{ + "fd": pipelineReadWriter.WriterFd(), + }, + }) + cfg.Elements = append(cfg.Elements, &PipelineElement{ + Name: "fdsink", + Data: map[string]interface{}{ + "fd": pipelineReadWriter.ReaderFd(), + }, + }) + if err := cfg.Apply(pipelineReadWriter.Pipeline()); err != nil { + if destroyErr := pipelineReadWriter.Pipeline().Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + return nil, err + } + return &PipelineReadWriterSimple{pipelineReadWriter}, nil +} diff --git a/gst/gstauto/pipeline_simple_writer.go b/gst/gstauto/pipeline_simple_writer.go new file mode 100644 index 0000000..57292e0 --- /dev/null +++ b/gst/gstauto/pipeline_simple_writer.go @@ -0,0 +1,85 @@ +package gstauto + +import ( + "errors" + "fmt" + "strings" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// PipelineWriterSimple implements a WritePipeliner that configures gstreamer +// to read directly from the internal write-buffer via a fdsrc. +type PipelineWriterSimple struct { + *PipelineWriter +} + +// NewPipelineWriterSimpleFromString returns a new PipelineWriterSimple populated from +// the given launch string. An fdsrc is added to the beginning of the string and tied to +// the write buffer. +func NewPipelineWriterSimpleFromString(launchStr string) (*PipelineWriterSimple, error) { + pipelineWriter, err := NewPipelineWriterFromString(addFdSrcToStr(launchStr)) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + if destroyErr := pipelineWriter.Pipeline().Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + } + }() + + // Retrieve the sources in the pipeline, most of the time there is just one + var sources []*gst.Element + sources, err = pipelineWriter.Pipeline().GetSourceElements() + if err != nil { + return nil, err + } + + // Fetch the fdsrc and reconfigure it to point to the write buffer. + for _, source := range sources { + if strings.Contains(source.Name(), "fdsrc") { + if err = source.Set("fd", pipelineWriter.WriterFd()); err != nil { + return nil, err + } + } + } + + // Return the pipeline + return &PipelineWriterSimple{pipelineWriter}, nil +} + +// NewPipelineWriterSimpleFromConfig returns a new PipelineWriterSimple populated from +// the given launch config. An fdsrc is added to the start of the launch config and tied +// to the write buffer. +func NewPipelineWriterSimpleFromConfig(cfg *PipelineConfig) (*PipelineWriterSimple, error) { + if cfg.Elements == nil { + return nil, errors.New("Elements cannot be nil in the config") + } + pipelineWriter, err := NewPipelineWriter("") + if err != nil { + return nil, err + } + cfg.pushPluginToTop(&PipelineElement{ + Name: "fdsrc", + Data: map[string]interface{}{ + "fd": pipelineWriter.WriterFd(), + }, + }) + if err := cfg.Apply(pipelineWriter.Pipeline()); err != nil { + if destroyErr := pipelineWriter.Pipeline().Destroy(); destroyErr != nil { + fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error()) + } + return nil, err + } + return &PipelineWriterSimple{pipelineWriter}, nil +} + +func addFdSrcToStr(pstr string) string { + if pstr == "" { + return "fdsrc" + } + return fmt.Sprintf("fdsink ! %s", pstr) +} diff --git a/gst/gstauto/pipeline_writer.go b/gst/gstauto/pipeline_writer.go new file mode 100644 index 0000000..0fd1a1e --- /dev/null +++ b/gst/gstauto/pipeline_writer.go @@ -0,0 +1,80 @@ +package gstauto + +import ( + "fmt" + + "github.com/tinyzimmer/go-gst-launch/gst" +) + +// Empty assignment to ensure PipelineWriter satisfies the WritePipeliner interface. +var _ WritePipeliner = &PipelineWriter{} + +// PipelineWriter is the base struct to be used to implement WritePipeliners. +type PipelineWriter struct { + *writeCloser + pipeline *gst.Pipeline +} + +// NewPipelineWriter returns a new PipelineWriter with an empty pipeline. Use an empty name +// to have gstreamer auto-generate one. This method is intended for use in the construction +// of other interfaces. +func NewPipelineWriter(name string) (*PipelineWriter, error) { + pipeline, err := gst.NewPipeline(name) + if err != nil { + return nil, err + } + wCloser, err := newWriteCloser() + if err != nil { + if closeErr := pipeline.Destroy(); closeErr != nil { + fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error()) + } + return nil, err + } + return &PipelineWriter{ + writeCloser: wCloser, + pipeline: pipeline, + }, nil +} + +// NewPipelineWriterFromString returns a new PipelineWriter with a pipeline populated +// by the provided gstreamer launch string. If you are looking to build a simple +// WritePipeliner you probably want to use NewPipelineWriterSimpleFromString. +func NewPipelineWriterFromString(launchStr string) (*PipelineWriter, error) { + pipeline, err := gst.NewPipelineFromString(launchStr) + if err != nil { + return nil, err + } + wCloser, err := newWriteCloser() + if err != nil { + if closeErr := pipeline.Destroy(); closeErr != nil { + fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error()) + } + return nil, err + } + return &PipelineWriter{ + writeCloser: wCloser, + pipeline: pipeline, + }, nil +} + +// Pipeline returns the underlying Pipeline instance for this pipeliner. It implements the +// Pipeliner interface. +func (w *PipelineWriter) Pipeline() *gst.Pipeline { return w.pipeline } + +// WriterFd returns the file descriptor that can be used to read from the write-buffer. This value +// is used when wanting to allow an underlying pipeline the ability to read data written to +// the buffer (e.g. when using a fdsrc). +func (w *PipelineWriter) WriterFd() uintptr { return w.writeCloser.wReader.Fd() } + +// Close will stop and unref the underlying pipeline. +func (w *PipelineWriter) Close() error { + if err := w.Pipeline().Destroy(); err != nil { + return err + } + return w.writeCloser.Close() +} + +// CloseAsync will close the underlying pipeline asynchronously. It is the caller's +// responsibility to call Unref on the pipeline and close buffers once it is no longer being used. +// This can be accomplished via calling a regular Close (which is idempotent). +func (w *PipelineWriter) CloseAsync() error { return w.pipeline.SetState(gst.StateNull) }