implement more functionality and move pipeline wrappers into new "gstauto" package

This commit is contained in:
tinyzimmer
2020-09-26 14:38:11 +03:00
parent 3ebd96be3a
commit aa5ddb96f1
29 changed files with 1011 additions and 999 deletions

1
.gitignore vendored
View File

@@ -16,3 +16,4 @@ vendor/
# Build outputs
dist/
.vscode/
_bin/

View File

@@ -5,3 +5,16 @@ build-cmd:
ARGS ?=
run-cmd: build-cmd
dist/go-gst $(ARGS)
GOLANGCI_VERSION ?= 1.23.8
GOLANGCI_LINT ?= _bin/golangci-lint
GOLANGCI_DOWNLOAD_URL ?= https://github.com/golangci/golangci-lint/releases/download/v${GOLANGCI_VERSION}/golangci-lint-${GOLANGCI_VERSION}-$(shell uname | tr A-Z a-z)-amd64.tar.gz
$(GOLANGCI_LINT):
mkdir -p $(dir $(GOLANGCI_LINT))
cd $(dir $(GOLANGCI_LINT)) && curl -JL $(GOLANGCI_DOWNLOAD_URL) | tar xzf -
chmod +x $(dir $(GOLANGCI_LINT))golangci-lint-$(GOLANGCI_VERSION)-$(shell uname | tr A-Z a-z)-amd64/golangci-lint
ln -s golangci-lint-$(GOLANGCI_VERSION)-$(shell uname | tr A-Z a-z)-amd64/golangci-lint $(GOLANGCI_LINT)
lint: $(GOLANGCI_LINT)
$(GOLANGCI_LINT) run -v --timeout 300s --skip-dirs cmd/

View File

@@ -2,8 +2,8 @@
Go bindings for the gstreamer C library
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-rounded)](https://pkg.go.dev/github.com/tinyzimmer/go-gst/gst)
[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/tinyzimmer/go-gst/gst)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-rounded)](https://pkg.go.dev/github.com/tinyzimmer/go-gst)
[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/tinyzimmer/go-gst)
[![GoReportCard example](https://goreportcard.com/badge/github.com/nanomsg/mangos)](https://goreportcard.com/report/github.com/tinyzimmer/go-gst)
This package was originally written to aid the audio support in [`kvdi`](https://github.com/tinyzimmer/kvdi).
@@ -34,7 +34,7 @@ For now the functionality is limitted to GIF encoing and other arbitrary pipelin
If I extend it further I'll publish releases, but for now, you can retrieve it with `go get`.
```bash
go get github.com/tinyzimmer/go-gst-launch/cmd/go-gst-launch
go get github.com/tinyzimmer/go-gst/cmd/go-gst
```
The usage is described below:

View File

@@ -47,16 +47,25 @@ $ yourprogram completion fish > ~/.config/fish/completions/yourprogram.fish
DisableFlagsInUseLine: true,
ValidArgs: []string{"bash", "zsh", "fish", "powershell"},
Args: cobra.ExactValidArgs(1),
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
switch args[0] {
case "bash":
cmd.Root().GenBashCompletion(os.Stdout)
if err := cmd.Root().GenBashCompletion(os.Stdout); err != nil {
return err
}
case "zsh":
cmd.Root().GenZshCompletion(os.Stdout)
if err := cmd.Root().GenZshCompletion(os.Stdout); err != nil {
return err
}
case "fish":
cmd.Root().GenFishCompletion(os.Stdout, true)
if err := cmd.Root().GenFishCompletion(os.Stdout, true); err != nil {
return err
}
case "powershell":
cmd.Root().GenPowerShellCompletion(os.Stdout)
if err := cmd.Root().GenPowerShellCompletion(os.Stdout); err != nil {
return err
}
}
return nil
},
}

View File

@@ -66,6 +66,9 @@ func gifEncode(cmd *cobra.Command, args []string) error {
}
tmpDir, err := ioutil.TempDir("", "")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
launchStr := fmt.Sprintf(
@@ -75,11 +78,11 @@ func gifEncode(cmd *cobra.Command, args []string) error {
logInfo("gif", "Converting video to image frames")
gstPipeline, err := gst.NewPipelineFromLaunchString(launchStr, gst.PipelineInternalOnly)
gstPipeline, err := gst.NewPipelineFromString(launchStr)
if err != nil {
return err
}
defer gstPipeline.Close()
defer gstPipeline.Destroy()
if err := gstPipeline.Start(); err != nil {
return err

View File

@@ -3,10 +3,12 @@ package main
import (
"errors"
"io"
"os"
"strings"
"github.com/spf13/cobra"
"github.com/tinyzimmer/go-gst-launch/gst"
"github.com/tinyzimmer/go-gst-launch/gst/gstauto"
)
func init() {
@@ -16,7 +18,7 @@ func init() {
var launchCmd = &cobra.Command{
Use: "launch",
Short: "Run a generic pipeline",
Long: `Uses the provided pipeline string to encode/decode the data in the pipeline.`,
Long: `Uses the provided gstreamer string to encode/decode the data in the pipeline.`,
PreRunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return errors.New("The pipeline string cannot be empty")
@@ -33,41 +35,50 @@ func launch(cmd *cobra.Command, args []string) error {
return err
}
var flags gst.PipelineFlags
if src != nil {
flags = flags | gst.PipelineWrite
}
if dest != nil {
flags = flags | gst.PipelineRead
}
pipelineString := strings.Join(args, " ")
logInfo("pipeline", "Creating pipeline")
gstPipeline, err := gst.NewPipelineFromLaunchString(pipelineString, flags)
pipeliner, err := getPipeline(src, dest, pipelineString)
if err != nil {
return err
}
defer gstPipeline.Close()
if verbose {
setupVerbosePipelineListeners(gstPipeline, "pipeline")
setupVerbosePipelineListeners(pipeliner.Pipeline(), "pipeline")
}
logInfo("pipeline", "Starting pipeline")
if err := gstPipeline.Start(); err != nil {
if err := pipeliner.Pipeline().Start(); err != nil {
return err
}
if src != nil {
go io.Copy(gstPipeline, src)
pipelineWriter := pipeliner.(gstauto.WritePipeliner)
go io.Copy(pipelineWriter, src)
defer pipelineWriter.Close()
}
if dest != nil {
go io.Copy(dest, gstPipeline)
pipelineReader := pipeliner.(gstauto.ReadPipeliner)
go io.Copy(dest, pipelineReader)
defer pipelineReader.Close()
} else {
defer pipeliner.Pipeline().Destroy()
}
gst.Wait(gstPipeline)
gst.Wait(pipeliner.Pipeline())
return nil
}
func getPipeline(src, dest *os.File, pipelineString string) (gstauto.Pipeliner, error) {
if src != nil && dest != nil {
return gstauto.NewPipelineReadWriterSimpleFromString(pipelineString)
}
if src != nil {
return gstauto.NewPipelineWriterSimpleFromString(pipelineString)
}
if dest != nil {
return gstauto.NewPipelineReaderSimpleFromString(pipelineString)
}
return gstauto.NewPipelinerSimpleFromString(pipelineString)
}

View File

@@ -8,8 +8,8 @@ import (
)
var (
srcFile, destFile, pipelineStr string
verbose, fromStdin, toStdout bool
srcFile, destFile string
verbose, fromStdin, toStdout bool
rootCmd = &cobra.Command{
Use: "go-gst",

View File

@@ -273,10 +273,12 @@ func printObjectPropertiesInfo(obj *gst.Object, description string) {
colorGreen.printf(`"%s"`, param.ValueType.Name())
if param.ValueType.Name() == "GstStructure" {
structure := gst.StructureFromGValue(param.DefaultValue)
for key, val := range structure.Values() {
colorReset.printIndent(26, "(gpointer) ")
colorYellow.printf("%15s:", key)
colorBlue.printf("%v", val)
if structure != nil {
for key, val := range structure.Values() {
colorReset.printIndent(26, "(gpointer) ")
colorYellow.printf("%15s:", key)
colorBlue.printf("%v", val)
}
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/spf13/cobra"
"github.com/tinyzimmer/go-gst-launch/gst"
"github.com/tinyzimmer/go-gst-launch/gst/gstauto"
"golang.org/x/net/websocket"
)
@@ -109,72 +110,69 @@ func handleWebsocketConnection(wsconn *websocket.Conn) {
logInfo("websocket", "New connection from", wsconn.Request().RemoteAddr)
wsconn.PayloadType = websocket.BinaryFrame
var playbackPipeline, recordingPipeline, sinkPipeline *gst.Pipeline
var err error
playbackPipeline, err = newPlaybackPipeline()
playbackPipeline, err := newPlaybackPipeline()
if err != nil {
logInfo("websocket", "ERROR:", err.Error())
return
}
playbackPipeline.SetAutoFlush(true)
logInfo("websocket", "Starting playback pipeline")
if err = playbackPipeline.Start(); err != nil {
if err = playbackPipeline.Pipeline().Start(); err != nil {
logInfo("websocket", "ERROR:", err.Error())
return
}
if verbose {
setupVerbosePipelineListeners(playbackPipeline, "playback")
setupVerbosePipelineListeners(playbackPipeline.Pipeline(), "playback")
}
var recordingPipeline gstauto.WritePipeliner
var sinkPipeline gstauto.Pipeliner
if micFifo != "" {
recordingPipeline, err = newRecordingPipeline()
recordingPipeline, err := newRecordingPipeline()
if err != nil {
logInfo("websocket", "Could not open pipeline for recording:", err.Error())
return
}
defer recordingPipeline.Close()
sinkPipeline, err = newSinkPipeline()
sinkPipeline, err := newSinkPipeline()
if err != nil {
logInfo("websocket", "Could not open null sink pipeling. Disabling recording.")
return
}
defer sinkPipeline.Close()
defer sinkPipeline.Pipeline().Destroy()
}
if recordingPipeline != nil && sinkPipeline != nil {
logInfo("websocket", "Starting recording pipeline")
if err = recordingPipeline.Start(); err != nil {
if err = recordingPipeline.Pipeline().Start(); err != nil {
logInfo("websocket", "Could not start recording pipeline")
return
}
logInfo("websocket", "Starting sink pipeline")
if err = sinkPipeline.Start(); err != nil {
if err = sinkPipeline.Pipeline().Start(); err != nil {
logInfo("websocket", "Could not start sink pipeline")
return
}
if verbose {
setupVerbosePipelineListeners(sinkPipeline, "mic-null-sink")
setupVerbosePipelineListeners(sinkPipeline.Pipeline(), "mic-null-sink")
}
var runMicFunc func()
runMicFunc = func() {
if verbose {
setupVerbosePipelineListeners(recordingPipeline, "recorder")
setupVerbosePipelineListeners(recordingPipeline.Pipeline(), "recorder")
}
go io.Copy(recordingPipeline, wsconn)
go func() {
var lastState gst.State
for msg := range recordingPipeline.GetBus().MessageChan() {
for msg := range recordingPipeline.Pipeline().GetBus().MessageChan() {
defer msg.Unref()
switch msg.Type() {
case gst.MessageStateChanged:
if lastState == gst.StatePlaying && recordingPipeline.GetState() != gst.StatePlaying {
if lastState == gst.StatePlaying && recordingPipeline.Pipeline().GetState() != gst.StatePlaying {
var nerr error
recordingPipeline.Close()
recordingPipeline, nerr = newRecordingPipeline()
@@ -183,13 +181,13 @@ func handleWebsocketConnection(wsconn *websocket.Conn) {
return
}
logInfo("websocket", "Restarting recording pipeline")
if nerr = recordingPipeline.Start(); nerr != nil {
if nerr = recordingPipeline.Pipeline().Start(); nerr != nil {
logInfo("websocket", "Could not start new recording pipeline, stopping input stream")
}
runMicFunc()
return
}
lastState = recordingPipeline.GetState()
lastState = recordingPipeline.Pipeline().GetState()
}
}
}()
@@ -208,25 +206,17 @@ func handleWebsocketConnection(wsconn *websocket.Conn) {
return
}
defer srcFile.Close()
stat, err := srcFile.Stat()
if err != nil {
return
}
appSrc := playbackPipeline.GetAppSrc()
appSrc.SetSize(stat.Size())
appSrc.PushBuffer(srcFile)
for {
if ret := appSrc.EndStream(); ret == gst.FlowOK {
break
}
}
// stat, err := srcFile.Stat()
// if err != nil {
// return
// }
go io.Copy(playbackPipeline.(gstauto.ReadWritePipeliner), srcFile)
}
gst.Wait(playbackPipeline)
gst.Wait(playbackPipeline.Pipeline())
}
func newPlaybackPipelineFromString() (*gst.Pipeline, error) {
func newPlaybackPipelineFromString() (gstauto.ReadWritePipeliner, error) {
pipelineString := "decodebin ! audioconvert ! audioresample"
switch encoding {
@@ -240,18 +230,18 @@ func newPlaybackPipelineFromString() (*gst.Pipeline, error) {
logInfo("playback", "Using pipeline string", pipelineString)
}
return gst.NewPipelineFromLaunchString(pipelineString, gst.PipelineReadWrite|gst.PipelineUseGstApp)
return gstauto.NewPipelineReadWriterSimpleFromString(pipelineString)
}
func newPlaybackPipeline() (*gst.Pipeline, error) {
func newPlaybackPipeline() (gstauto.ReadPipeliner, error) {
if srcFile != "" {
return newPlaybackPipelineFromString()
}
cfg := &gst.PipelineConfig{Elements: []*gst.PipelineElement{}}
cfg := &gstauto.PipelineConfig{Elements: []*gstauto.PipelineElement{}}
pulseSrc := &gst.PipelineElement{
pulseSrc := &gstauto.PipelineElement{
Name: "pulsesrc",
Data: map[string]interface{}{"server": pulseServer},
SinkCaps: gst.NewRawCaps("S16LE", 24000, 2),
@@ -265,19 +255,19 @@ func newPlaybackPipeline() (*gst.Pipeline, error) {
switch encoding {
case "opus":
cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "cutter"})
cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "opusenc"})
cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "webmmux"})
cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "cutter"})
cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "opusenc"})
cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "webmmux"})
case "vorbis":
cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "vorbisenc"})
cfg.Elements = append(cfg.Elements, &gst.PipelineElement{Name: "oggmux"})
cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "vorbisenc"})
cfg.Elements = append(cfg.Elements, &gstauto.PipelineElement{Name: "oggmux"})
}
return gst.NewPipelineFromConfig(cfg, gst.PipelineRead|gst.PipelineUseGstApp, nil)
return gstauto.NewPipelineReaderSimpleFromConfig(cfg)
}
func newRecordingPipeline() (*gst.Pipeline, error) {
return gst.NewPipelineFromLaunchString(newPipelineStringFromOpts(), gst.PipelineWrite)
func newRecordingPipeline() (gstauto.WritePipeliner, error) {
return gstauto.NewPipelineWriterSimpleFromString(newPipelineStringFromOpts())
}
func newPipelineStringFromOpts() string {
@@ -290,9 +280,9 @@ func newPipelineStringFromOpts() string {
)
}
func newSinkPipeline() (*gst.Pipeline, error) {
cfg := &gst.PipelineConfig{
Elements: []*gst.PipelineElement{
func newSinkPipeline() (gstauto.Pipeliner, error) {
cfg := &gstauto.PipelineConfig{
Elements: []*gstauto.PipelineElement{
{
Name: "pulsesrc",
Data: map[string]interface{}{"server": pulseServer, "device": micName},
@@ -304,5 +294,5 @@ func newSinkPipeline() (*gst.Pipeline, error) {
},
},
}
return gst.NewPipelineFromConfig(cfg, gst.PipelineInternalOnly, nil)
return gstauto.NewPipelinerSimpleFromConfig(cfg)
}

View File

@@ -1,141 +1,6 @@
/*
Package gst provides wrappers for building gstreamer pipelines and then
reading and/or writing from either end of the pipeline.
It uses cgo to interface with the gstreamer-1.0 C API.
A simple opus/webm encoder created from a launch string could look like this:
import (
"os"
"github.com/tinyzimmer/go-gst-launch/gst"
)
func main() {
gst.Init()
encoder, err := gst.NewPipelineFromLaunchString("opusenc ! webmmux", gst.PipelineReadWrite)
if err != nil {
panic(err)
}
// You should close even if you don't start the pipeline, since this
// will free resources created by gstreamer.
defer encoder.Close()
if err := encoder.Start() ; err != nil {
panic(err)
}
go func() {
encoder.Write(...) // Write raw audio data to the pipeline
}()
// don't actually do this - copy encoded audio to stdout
if _, err := io.Copy(os.Stdout, encoder) ; err != nil {
panic(err)
}
}
You can accomplish the same thing using the "configuration" functionality provided by NewPipelineFromConfig().
Here is an example that will record from a pulse server and make opus/webm data available on the Reader.
import (
"io"
"os"
"github.com/tinyzimmer/go-gst-launch/gst"
)
func main() {
gst.Init()
encoder, err := gst.NewPipelineFromConfig(&gst.PipelineConfig{
Plugins: []*gst.Plugin{
{
Name: "pulsesrc",
Data: map[string]interface{}{
"server": "/run/user/1000/pulse/native",
"device": "playback-device.monitor",
},
SinkCaps: gst.NewRawCaps("S16LE", 24000, 2),
},
{
Name: "opusenc",
},
{
Name: "webmmux",
},
},
}, gst.PipelineRead, nil)
if err != nil {
panic(err)
}
defer encoder.Close()
if err := encoder.Start() ; err != nil {
panic(err)
}
// Create an output file
f, err := os.Create("out.opus")
if err != nil {
panic(err)
}
// Copy the data from the pipeline to the file
if err := io.Copy(f, encoder) ; err != nil {
panic(err)
}
}
There are two channels exported for listening for messages from the pipeline.
An example of listening to messages on a fake pipeline for 10 seconds:
package main
import (
"fmt"
"time"
"github.com/tinyzimmer/go-gst-launch/gst"
)
func main() {
gst.Init()
pipeline, err := gst.NewPipelineFromLaunchString("audiotestsrc ! fakesink", gst.PipelineInternalOnly)
if err != nil {
panic(err)
}
defer pipeline.Close()
go func() {
for msg := range pipeline.MessageChan() {
fmt.Println("Got message:", msg.TypeName())
}
}()
go func() {
for msg := range pipeline.ErrorChan() {
fmt.Println("Got error:", err)
}
}()
if err := pipeline.Start(); err != nil {
fmt.Println("Pipeline failed to start")
return
}
time.Sleep(time.Second * 10)
}
The package also exposes some low level functionality for building pipelines
and doing dynamic linking yourself. See the NewPipeline() function for creating an
empty pipeline that you can then build out using the other structs and methods provided
by this package.
Package gst contains bindings for the gstreamer C API. If you are trying
to build simple pipelines quickly (and optiionally readers/writers) see
the gstauto package.
*/
package gst

View File

@@ -67,18 +67,4 @@ func (a *AppSrc) PushBuffer(data io.Reader) FlowReturn {
return FlowReturn(ret)
}
// FlowReturn is go type casting for GstFlowReturn.
type FlowReturn C.GstFlowReturn
// Type casting of the GstFlowReturn types. Custom ones are omitted for now.
const (
FlowOK FlowReturn = C.GST_FLOW_OK // Data passing was ok
FlowNotLinked = C.GST_FLOW_NOT_LINKED // Pad is not linked
FlowFlushing = C.GST_FLOW_FLUSHING // Pad is flushing
FlowEOS = C.GST_FLOW_EOS // Pad is EOS
FlowNotNegotiated = C.GST_FLOW_NOT_NEGOTIATED // Pad is not negotiated
FlowError = C.GST_FLOW_ERROR // Some (fatal) error occurred
FlowNotSupported = C.GST_FLOW_NOT_SUPPORTED // The operation is not supported.
)
func wrapAppSrc(elem *Element) *AppSrc { return &AppSrc{elem} }

View File

@@ -196,3 +196,17 @@ const (
MiniObjectFlagMayBeLeaked MiniObjectFlags = C.GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED // (4) the object is expected to stay alive even after gst_deinit has been called and so should be ignored by leak detection tools. (Since: 1.10)
MiniObjectFlagLast MiniObjectFlags = C.GST_MINI_OBJECT_FLAG_LAST // (16) first flag that can be used by subclasses.
)
// FlowReturn is go type casting for GstFlowReturn.
type FlowReturn int
// Type casting of the GstFlowReturn types. Custom ones are omitted for now.
const (
FlowOK FlowReturn = C.GST_FLOW_OK // Data passing was ok
FlowNotLinked FlowReturn = C.GST_FLOW_NOT_LINKED // Pad is not linked
FlowFlushing FlowReturn = C.GST_FLOW_FLUSHING // Pad is flushing
FlowEOS FlowReturn = C.GST_FLOW_EOS // Pad is EOS
FlowNotNegotiated FlowReturn = C.GST_FLOW_NOT_NEGOTIATED // Pad is not negotiated
FlowError FlowReturn = C.GST_FLOW_ERROR // Some (fatal) error occurred
FlowNotSupported FlowReturn = C.GST_FLOW_NOT_SUPPORTED // The operation is not supported.
)

View File

@@ -39,8 +39,11 @@ func NewMiniObject(flags MiniObjectFlags, gtype glib.Type) *MiniObject {
// native returns the pointer to the underlying object.
func (m *MiniObject) unsafe() unsafe.Pointer { return m.ptr }
// Parent returns the parent of this MiniObject
func (m *MiniObject) Parent() *MiniObject { return m.parent }
// Instance returns the native GstMiniObject instance.
func (m *MiniObject) Instance() *C.GstMiniObject { return C.toGstMiniObject(m.ptr) }
func (m *MiniObject) Instance() *C.GstMiniObject { return C.toGstMiniObject(m.unsafe()) }
// Ref increases the ref count on this object by one.
func (m *MiniObject) Ref() { C.gst_mini_object_ref(m.Instance()) }

View File

@@ -9,39 +9,14 @@ package gst
import "C"
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"strings"
"unsafe"
"github.com/gotk3/gotk3/glib"
)
// PipelineFlags represents arguments passed to a new Pipeline.
type PipelineFlags int
const (
// PipelineInternalOnly signals that this pipeline only handles data internally.
PipelineInternalOnly PipelineFlags = 1 << iota
// PipelineRead signals that the Read() method can be used on the end of this pipeline.
PipelineRead
// PipelineWrite signals that the Write() method can be used on the start of this pipeline.
PipelineWrite
// PipelineUseGstApp signals the desire to use an AppSink or AppSrc instead of the default
// os pipes, fdsrc, and fdsink.
// When using this flag, you should interact with the pipeline using the GetAppSink and
// GetAppSrc methods.
PipelineUseGstApp
// PipelineReadWrite signals that this pipeline can be both read and written to.
PipelineReadWrite = PipelineRead | PipelineWrite
)
// has returns true if these flags contain the given flag.
func (p PipelineFlags) has(b PipelineFlags) bool { return p&b != 0 }
// Pipeline is a go implementation of a GstPipeline. Helper methods are provided for constructing
// pipelines either using file descriptors or the Appsrc/Appsink APIs. The struct itself implements
// a ReadWriteCloser.
@@ -51,47 +26,25 @@ type Pipeline struct {
// a local reference to the bus so duplicates aren't created
// when retrieved by the user
bus *Bus
// The buffers backing the Read and Write methods
destBuf *bufio.Reader
srcBuf *bufio.Writer
// used with PipelineWrite
srcReader, srcWriter *os.File
// used with PipelineRead
destReader, destWriter *os.File
// used with PipelineWrite AND PipelineGstApp
appSrc *AppSrc
// used with PipelineRead AND PipelineGstApp
appSink *AppSink
autoFlush bool // when set to true, the contents of the app sink are automatically flushed to the read buffer.
// The element that represents the source/dest pipeline
// and any caps to apply to it.
srcElement *Element
srcCaps *Caps
destElement *Element
// whether or not the pipeline was built from a string. this is checked when
// starting to see who is responsible for build and linking the buffers.
pipelineFromHelper bool
// A channel where a caller can listen for errors asynchronously.
errCh chan error
// A channel where a caller can listen for messages
msgCh []chan *Message
}
func newEmptyPipeline() (*C.GstPipeline, error) {
pipeline := C.gst_pipeline_new((*C.gchar)(nil))
// NewPipeline allocates and returns a new empty pipeline. If name is empty, one
// is generated by gstreamer.
func NewPipeline(name string) (*Pipeline, error) {
var cChar *C.char
if name != "" {
cChar = C.CString(name)
defer C.free(unsafe.Pointer(cChar))
}
pipeline := C.gst_pipeline_new((*C.gchar)(cChar))
if pipeline == nil {
return nil, errors.New("Could not create new pipeline")
}
return C.toGstPipeline(unsafe.Pointer(pipeline)), nil
return wrapPipeline(glib.Take(unsafe.Pointer(pipeline))), nil
}
func newPipelineFromString(launchv string) (*C.GstPipeline, error) {
// NewPipelineFromString creates a new gstreamer pipeline from the given launch string.
func NewPipelineFromString(launchv string) (*Pipeline, error) {
if len(strings.Split(launchv, "!")) < 2 {
return nil, fmt.Errorf("Given string is too short for a pipeline: %s", launchv)
}
@@ -104,143 +57,12 @@ func newPipelineFromString(launchv string) (*C.GstPipeline, error) {
errMsg := C.GoString(gerr.message)
return nil, errors.New(errMsg)
}
return C.toGstPipeline(unsafe.Pointer(pipeline)), nil
}
// NewPipeline builds and returns a new empty Pipeline instance.
func NewPipeline(flags PipelineFlags) (*Pipeline, error) {
pipelineElement, err := newEmptyPipeline()
if err != nil {
return nil, err
}
pipeline := wrapPipeline(glib.Take(unsafe.Pointer(pipelineElement)))
if err := applyFlags(pipeline, flags); err != nil {
return nil, err
}
return pipeline, nil
}
func applyFlags(pipeline *Pipeline, flags PipelineFlags) error {
// If the user wants to be able to write to the pipeline, set up the
// write-buffers
if flags.has(PipelineWrite) {
// Set up a pipe
if err := pipeline.setupWriters(); err != nil {
return err
}
}
// If the user wants to be able to read from the pipeline, setup the
// read-buffers.
if flags.has(PipelineRead) {
if err := pipeline.setupReaders(); err != nil {
return err
}
}
return nil
}
func (p *Pipeline) setupWriters() error {
var err error
p.srcReader, p.srcWriter, err = os.Pipe()
if err != nil {
return err
}
p.srcBuf = bufio.NewWriter(p.srcWriter)
return nil
}
func (p *Pipeline) setupReaders() error {
var err error
p.destReader, p.destWriter, err = os.Pipe()
if err != nil {
return err
}
p.destBuf = bufio.NewReader(p.destReader)
return nil
return wrapPipeline(glib.Take(unsafe.Pointer(pipeline))), nil
}
// Instance returns the native GstPipeline instance.
func (p *Pipeline) Instance() *C.GstPipeline { return C.toGstPipeline(p.unsafe()) }
// Read implements a Reader and returns data from the read buffer.
func (p *Pipeline) Read(b []byte) (int, error) {
if p.destBuf == nil {
return 0, io.ErrClosedPipe
}
return p.destBuf.Read(b)
}
// readerFd returns the file descriptor for the read buffer, or 0 if
// there isn't one. It returns the file descriptor that can be written to
// by gstreamer.
func (p *Pipeline) readerFd() uintptr {
if p.destWriter == nil {
return 0
}
return p.destWriter.Fd()
}
// Write implements a Writer and places data in the write buffer.
func (p *Pipeline) Write(b []byte) (int, error) {
if p.srcBuf == nil {
return 0, io.ErrClosedPipe
}
return p.srcBuf.Write(b)
}
// writerFd returns the file descriptor for the write buffer, or 0 if
// there isn't one. It returns the file descriptor that can be read from
// by gstreamer.
func (p *Pipeline) writerFd() uintptr {
if p.srcWriter == nil {
return 0
}
return p.srcReader.Fd()
}
// SetWriterCaps sets the caps on the write-buffer. You will usually want to call this
// on a custom pipeline, unless you are using downstream elements that do dynamic pad
// linking.
func (p *Pipeline) SetWriterCaps(caps *Caps) { p.srcCaps = caps }
// LinkWriterTo links the write buffer on this Pipeline to the given element. This must
// be called when the pipeline is constructed with PipelineWrite or PipelineReadWrite.
func (p *Pipeline) LinkWriterTo(elem *Element) { p.srcElement = elem }
// LinkReaderTo links the read buffer on this Pipeline to the given element. This must
// be called when the pipeline is constructed with PipelineRead or PipelineReadWrite.
func (p *Pipeline) LinkReaderTo(elem *Element) { p.destElement = elem }
// IsUsingGstApp returns true if the current pipeline is using GstApp instead of file descriptors.
func (p *Pipeline) IsUsingGstApp() bool {
return p.appSrc != nil || p.appSink != nil
}
// GetAppSrc returns the AppSrc for this pipeline if created with PipelineUseGstApp.
// Unref after usage.
func (p *Pipeline) GetAppSrc() *AppSrc {
if p.appSrc == nil {
return nil
}
// increases the ref count on the element
return wrapAppSrc(p.appSrc.Element)
}
// GetAppSink returns the AppSink for this pipeline if created with PipelineUseGstApp.
// Unref after usage.
func (p *Pipeline) GetAppSink() *AppSink {
if p.appSink == nil {
return nil
}
// increases the ref count
return wrapAppSink(p.appSink.Element)
}
// GetBus returns the message bus for this pipeline.
func (p *Pipeline) GetBus() *Bus {
if p.bus == nil {
@@ -250,245 +72,23 @@ func (p *Pipeline) GetBus() *Bus {
return p.bus
}
// SetAutoFlush sets whether or not samples should be automatically flushed to the read-buffer
// (default for pipelines not built with PipelineUseGstApp) and if messages should be flushed
// on the bus when the pipeline is stopped.
func (p *Pipeline) SetAutoFlush(b bool) {
p.Set("auto-flush-bus", b)
p.autoFlush = b
}
// AutoFlush returns true if the pipeline is using a GstAppSink and is configured to autoflush to the
// read-buffer.
func (p *Pipeline) AutoFlush() bool { return p.IsUsingGstApp() && p.autoFlush }
// Flush flushes the app sink to the read buffer. It is usually more desirable to interface
// with the PullSample and BlockPullSample methods on the AppSink interface directly. Or
// to set autoflush to true.
func (p *Pipeline) Flush() error {
sample, err := p.appSink.PullSample()
if err != nil { // err signals end of stream
return err
}
if sample == nil {
return nil
}
defer sample.Unref()
if _, err := io.Copy(p.destWriter, sample.GetBuffer()); err != nil {
return err
}
return nil
}
// BlockFlush is like Flush but it blocks until a sample is available. This is intended for
// use with PipelineUseGstApp.
func (p *Pipeline) BlockFlush() error {
sample, err := p.appSink.BlockPullSample()
if err != nil { // err signals end of stream
return err
}
if sample == nil {
return nil
}
defer sample.Unref()
if _, err := io.Copy(p.destWriter, sample.GetBuffer()); err != nil {
return err
}
return nil
}
// setupSrc sets up a source element with the given configuration.
func (p *Pipeline) setupSrc(pluginName string, args map[string]interface{}) (*Element, error) {
elem, err := NewElement(pluginName)
if err != nil {
return nil, err
}
for k, v := range args {
if err := elem.Set(k, v); err != nil {
return nil, err
}
}
if err := p.Add(elem); err != nil {
return nil, err
}
if p.srcCaps != nil {
return elem, elem.LinkFiltered(p.srcElement, p.srcCaps)
}
return elem, elem.Link(p.srcElement)
}
// setupFdSrc will setup a fdsrc as the source of the pipeline.
func (p *Pipeline) setupFdSrc() error {
_, err := p.setupSrc("fdsrc", map[string]interface{}{
"fd": p.writerFd(),
})
return err
}
// setupAppSrc sets up an appsrc as the source of the pipeline
func (p *Pipeline) setupAppSrc() error {
appSrc, err := p.setupSrc("appsrc", map[string]interface{}{
"block": true, // TODO: make this configurable
"emit-signals": false, // https://gstreamer.freedesktop.org/documentation/app/appsrc.html?gi-language=c
})
if err != nil {
return err
}
p.appSrc = &AppSrc{appSrc}
return nil
}
// setupSrcElement will setup the source element when the pipeline is constructed with
// PipelineWrite.
func (p *Pipeline) setupSrcElement() error {
if p.srcElement == nil {
return errors.New("Pipeline was constructed with PipelineWrite but LinkWriterTo was never called")
}
if p.IsUsingGstApp() {
return p.setupAppSrc()
}
return p.setupFdSrc()
}
// setupSink sets up a sink element with the given congifuration.
func (p *Pipeline) setupSink(pluginName string, args map[string]interface{}) (*Element, error) {
elem, err := NewElement(pluginName)
if err != nil {
return nil, err
}
for k, v := range args {
if err := elem.Set(k, v); err != nil {
return nil, err
}
}
if err := p.Add(elem); err != nil {
return nil, err
}
return elem, p.destElement.Link(elem)
}
// setupFdSink sets up a fdsink as the sink of the pipeline.
func (p *Pipeline) setupFdSink() error {
_, err := p.setupSink("fdsink", map[string]interface{}{
"fd": p.readerFd(),
})
return err
}
// setupAppSink sets up an appsink as the sink of the pipeline.
func (p *Pipeline) setupAppSink() error {
appSink, err := p.setupSink("appsink", map[string]interface{}{
"emit-signals": false,
})
if err != nil {
return err
}
p.appSink = wrapAppSink(appSink)
return nil
}
// setupDestElement will setup the destination (sink) element when the pipeline is constructed with
// PipelineRead.
func (p *Pipeline) setupDestElement() error {
if p.destElement == nil {
return errors.New("Pipeline was constructed with PipelineRead but LinkReaderTo was never called")
}
if p.IsUsingGstApp() {
return p.setupAppSink()
}
return p.setupFdSink()
}
// Start will start the GstPipeline. It is asynchronous so it does not need to be
// called within a goroutine, however, it is still safe to do so.
func (p *Pipeline) Start() error {
// If there is a write buffer on this pipeline, set up an fdsrc
if p.srcBuf != nil && !p.pipelineFromHelper {
if err := p.setupSrcElement(); err != nil {
return err
}
}
// If there is a read buffer on this pipeline, set up an fdsink
if p.destBuf != nil && !p.pipelineFromHelper {
if err := p.setupDestElement(); err != nil {
return err
}
}
return p.startPipeline()
return p.SetState(StatePlaying)
}
func (p *Pipeline) closeBuffers() error {
if p.srcBuf != nil && p.srcReader != nil && p.srcWriter != nil {
if err := p.srcReader.Close(); err != nil {
return err
}
if err := p.srcWriter.Close(); err != nil {
return err
}
p.srcBuf = nil
}
if p.destBuf != nil && p.destReader != nil && p.destWriter != nil {
if err := p.destReader.Close(); err != nil {
return err
}
if err := p.destWriter.Close(); err != nil {
return err
}
p.destBuf = nil
// Destroy will attempt to stop the pipeline and then unref once the stream has
// fully completed.
func (p *Pipeline) Destroy() error {
if err := p.BlockSetState(StateNull); err != nil {
return err
}
p.Unref()
return nil
}
// ReadBufferSize returns the current size of the unread portion of the read-buffer.
func (p *Pipeline) ReadBufferSize() int {
if p.destBuf == nil {
return 0
}
return p.destBuf.Buffered()
}
// WriteBufferSize returns the current size of the unread portion of the write-buffer.
func (p *Pipeline) WriteBufferSize() int {
if p.srcBuf == nil {
return 0
}
return p.srcBuf.Buffered()
}
// TotalBufferSize returns the sum of the Read and Write buffer unread portions.
func (p *Pipeline) TotalBufferSize() int { return p.WriteBufferSize() + p.ReadBufferSize() }
// Close implements a Closer and closes all buffers.
func (p *Pipeline) Close() error {
defer p.Unref()
if err := p.closeBuffers(); err != nil {
return err
}
return p.SetState(StateNull)
}
// startPipeline will set the GstPipeline to the PLAYING state.
func (p *Pipeline) startPipeline() error {
if err := p.SetState(StatePlaying); err != nil {
return err
}
// If using GstApp with autoflush
if p.AutoFlush() {
go func() {
for {
if err := p.BlockFlush(); err != nil {
// err signals end of stream
return
}
}
}()
}
return nil
}
// Wait waits for the given pipeline to reach end of stream.
// Wait waits for the given pipeline to reach end of stream or be stopped.
func Wait(p *Pipeline) {
if p.Instance() == nil {
return

View File

@@ -1,194 +0,0 @@
package gst
import "fmt"
// PipelineConfig represents a list of elements and their configurations
// to be used with NewPipelineFromConfig.
type PipelineConfig struct {
Elements []*PipelineElement
}
// GetElementByName returns the Element configuration for the given name.
func (p *PipelineConfig) GetElementByName(name string) *PipelineElement {
for _, elem := range p.Elements {
if name == elem.GetName() {
return elem
}
}
return nil
}
// ElementNames returns a string slice of the names of all the plugins.
func (p *PipelineConfig) ElementNames() []string {
names := make([]string, 0)
for _, elem := range p.Elements {
names = append(names, elem.GetName())
}
return names
}
// pushPluginToTop pushes a plugin to the top of the list.
func (p *PipelineConfig) pushPluginToTop(elem *PipelineElement) {
newSlc := []*PipelineElement{elem}
newSlc = append(newSlc, p.Elements...)
p.Elements = newSlc
}
// PipelineElement represents an `GstElement` in a `GstPipeline` when building a Pipeline with `NewPipelineFromConfig`.
// The Name should coorespond to a valid gstreamer plugin name. The data are additional
// fields to set on the element. If SinkCaps is non-nil, they are applied to the sink of this
// element.
type PipelineElement struct {
Name string
SinkCaps *Caps
Data map[string]interface{}
}
// GetName returns the name to use when creating Elements from this configuration.
func (p *PipelineElement) GetName() string { return p.Name }
// NewPipelineFromConfig builds a new pipeline from the given PipelineConfig. The plugins provided
// in the configuration will be linked in the order they are given.
// If using PipelineWrite, you can optionally pass a Caps object to filter between the write-buffer
// and the start of the pipeline.
func NewPipelineFromConfig(cfg *PipelineConfig, flags PipelineFlags, caps *Caps) (pipeline *Pipeline, err error) {
// create a new empty pipeline instance
pipeline, err = NewPipeline(flags)
if err != nil {
return nil, err
}
// if any error happens while setting up the pipeline, immediately free it
defer func() {
if err != nil {
if cerr := pipeline.Close(); cerr != nil {
fmt.Println("Failed to close pipeline:", err)
}
}
}()
if cfg.Elements == nil {
cfg.Elements = make([]*PipelineElement, 0)
}
if flags.has(PipelineWrite) {
if flags.has(PipelineUseGstApp) {
cfg.pushPluginToTop(&PipelineElement{
Name: "appsrc",
Data: map[string]interface{}{
"block": true, // TODO: make these all configurable
"emit-signals": false, // https://gstreamer.freedesktop.org/documentation/app/appsrc.html?gi-language=c
"is-live": true,
"max-bytes": 200000,
// "size": 0, // If this is known we should specify it
},
SinkCaps: caps,
})
} else {
cfg.pushPluginToTop(&PipelineElement{
Name: "fdsrc",
Data: map[string]interface{}{
"fd": pipeline.writerFd(),
},
SinkCaps: caps,
})
}
}
if flags.has(PipelineRead) {
if flags.has(PipelineUseGstApp) {
cfg.Elements = append(cfg.Elements, &PipelineElement{
Name: "appsink",
Data: map[string]interface{}{
"emit-signals": false,
},
})
} else {
cfg.Elements = append(cfg.Elements, &PipelineElement{
Name: "fdsink",
Data: map[string]interface{}{
"fd": pipeline.readerFd(),
},
})
}
}
// retrieve a list of the plugin names
pluginNames := cfg.ElementNames()
// build all the elements
var elements map[int]*Element
elements, err = NewElementMany(pluginNames...)
if err != nil {
return
}
// iterate the plugin names and add them to the pipeline
for idx, name := range pluginNames {
// get the current plugin and element
currentPlugin := cfg.GetElementByName(name)
currentElem := elements[idx]
// Iterate any data with the plugin and set it on the element
for key, value := range currentPlugin.Data {
if err = currentElem.Set(key, value); err != nil {
return
}
}
// Add the element to the pipeline
if err = pipeline.Add(currentElem); err != nil {
return
}
// If this is the first element continue
if idx == 0 {
continue
}
// get the last element in the chain
lastPluginName := pluginNames[idx-1]
lastElem := elements[idx-1]
lastPlugin := cfg.GetElementByName(lastPluginName)
if lastPlugin == nil {
// this should never happen, since only used internally,
// but safety from panic
continue
}
// If this is the second element and we are configuring writing
// call link on the last element
if idx == 1 && flags.has(PipelineWrite) {
pipeline.LinkWriterTo(lastElem)
if flags.has(PipelineUseGstApp) {
pipeline.appSrc = wrapAppSrc(lastElem)
}
}
// If this is the last element and we are configuring reading
// call link on the element
if idx == len(pluginNames)-1 && flags.has(PipelineRead) {
pipeline.LinkReaderTo(currentElem)
if flags.has(PipelineUseGstApp) {
pipeline.appSink = wrapAppSink(currentElem)
}
}
// If there are sink caps on the last element, do a filtered link to this one and continue
if lastPlugin.SinkCaps != nil {
if err = lastElem.LinkFiltered(currentElem, lastPlugin.SinkCaps); err != nil {
return
}
continue
}
// link the last element to this element
if err = lastElem.Link(currentElem); err != nil {
return
}
}
pipeline.pipelineFromHelper = true
return
}

View File

@@ -1,145 +0,0 @@
package gst
import (
"errors"
"fmt"
"strings"
"unsafe"
"github.com/gotk3/gotk3/glib"
)
// NewPipelineFromLaunchString returns a new GstPipeline from the given launch string. If flags
// contain PipelineRead or PipelineWrite, the launch string is further formatted accordingly.
//
// If using PipelineWrite, you should generally start your pipeline with the caps of the source.
func NewPipelineFromLaunchString(launchStr string, flags PipelineFlags) (*Pipeline, error) {
// reformat the string to point at the writerFd
if flags.has(PipelineWrite) {
if flags.has(PipelineUseGstApp) {
if launchStr == "" {
launchStr = "appsrc"
} else {
launchStr = fmt.Sprintf("appsrc ! %s", launchStr)
}
} else {
if launchStr == "" {
launchStr = "fdsrc"
} else {
launchStr = fmt.Sprintf("fdsrc ! %s", launchStr)
}
}
}
if flags.has(PipelineRead) {
if flags.has(PipelineUseGstApp) {
if launchStr == "" {
launchStr = "appsink emit-signals=false"
} else {
launchStr = fmt.Sprintf("%s ! appsink emit-signals=false", launchStr)
}
} else {
if launchStr == "" {
launchStr = "fdsink"
} else {
launchStr = fmt.Sprintf("%s ! fdsink", launchStr)
}
}
}
pipelineElement, err := newPipelineFromString(launchStr)
if err != nil {
return nil, err
}
pipeline := wrapPipeline(glib.Take(unsafe.Pointer(pipelineElement)))
if err := applyFlags(pipeline, flags); err != nil {
return nil, err
}
if flags.has(PipelineWrite) {
sources, err := pipeline.GetSourceElements()
if err != nil {
return nil, err
}
var srcType string
if flags.has(PipelineUseGstApp) {
srcType = "appsrc"
} else {
srcType = "fdsrc"
}
var pipelineSrc *Element
for _, src := range sources {
if strings.Contains(src.Name(), srcType) {
pipelineSrc = src
} else {
src.Unref()
}
}
if pipelineSrc == nil {
return nil, errors.New("Could not detect pipeline source")
}
defer pipelineSrc.Unref()
if flags.has(PipelineUseGstApp) {
pipeline.appSrc = wrapAppSrc(pipelineSrc)
} else {
if err := pipelineSrc.Set("fd", pipeline.writerFd()); err != nil {
return nil, err
}
}
}
if flags.has(PipelineRead) {
sinks, err := pipeline.GetSinkElements()
if err != nil {
return nil, err
}
var sinkType string
if flags.has(PipelineUseGstApp) {
sinkType = "appsink"
} else {
sinkType = "fdsink"
}
var pipelineSink *Element
for _, sink := range sinks {
if strings.Contains(sink.Name(), sinkType) {
pipelineSink = sink
} else {
sink.Unref()
}
}
if pipelineSink == nil {
return nil, errors.New("Could not detect pipeline sink")
}
defer pipelineSink.Unref()
if flags.has(PipelineUseGstApp) {
pipeline.appSink = wrapAppSink(pipelineSink)
} else {
if err := pipelineSink.Set("fd", pipeline.readerFd()); err != nil {
return nil, err
}
}
}
// signal that this pipeline was made from a string and therefore already linked
pipeline.pipelineFromHelper = true
return pipeline, err
}

View File

@@ -42,9 +42,13 @@ func NewStructureFromString(stStr string) *Structure {
return wrapStructure(structure)
}
// StructureFromGValue extracts the GstStructure from a glib.Value.
// StructureFromGValue extracts the GstStructure from a glib.Value, or nil
// if one does not exist.
func StructureFromGValue(gval *glib.Value) *Structure {
st := C.gst_value_get_structure((*C.GValue)(gval.Native()))
if st == nil {
return nil
}
return wrapStructure(st)
}

101
gst/gstauto/bufio.go Normal file
View File

@@ -0,0 +1,101 @@
package gstauto
import (
"bufio"
"io"
"os"
)
// Blank assertions to ensure interfaces are implemented.
var _ io.ReadCloser = &readCloser{}
var _ io.WriteCloser = &writeCloser{}
var _ io.ReadWriteCloser = &readWriteCloser{}
// readCloser is a struct that provides a read buffer that can also be written to
// internally.
type readCloser struct {
rReader, rWriter *os.File
rBuf *bufio.Reader
}
// newReadCloser returns a new readCloser.
func newReadCloser() (*readCloser, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &readCloser{
rReader: r,
rWriter: w,
rBuf: bufio.NewReader(r),
}, nil
}
// Read implements a Reader for objects embdedding this struct.
func (r *readCloser) Read(p []byte) (int, error) { return r.rBuf.Read(p) }
// Close implements a Closer for objects embedding this struct.
func (r *readCloser) Close() error {
if err := r.rWriter.Close(); err != nil {
return err
}
return r.rReader.Close()
}
// writeCloser is a struct that provides a read buffer that can also be
// read from internally.
type writeCloser struct {
wReader, wWriter *os.File
wBuf *bufio.Writer
}
// newWriteCloser returns a new writeCloser.
func newWriteCloser() (*writeCloser, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &writeCloser{
wReader: r,
wWriter: w,
wBuf: bufio.NewWriter(w),
}, nil
}
// Write implements a Writer for objects embedding this struct.
func (w *writeCloser) Write(p []byte) (int, error) { return w.wBuf.Write(p) }
// Close implements a Closer for objects embedding this struct.
func (w *writeCloser) Close() error {
if err := w.wWriter.Close(); err != nil {
return err
}
return w.wReader.Close()
}
// readWriteCloser is a struct that provides both read and write buffers.
type readWriteCloser struct {
*readCloser
*writeCloser
}
// newReadWriteCloser returns a new readWriteCloser.
func newReadWriteCloser() (*readWriteCloser, error) {
rCloser, err := newReadCloser()
if err != nil {
return nil, err
}
wCloser, err := newWriteCloser()
if err != nil {
return nil, err
}
return &readWriteCloser{readCloser: rCloser, writeCloser: wCloser}, nil
}
// Close implements a Closer for objects embedding this struct.
func (rw *readWriteCloser) Close() error {
if err := rw.writeCloser.Close(); err != nil {
return err
}
return rw.readCloser.Close()
}

4
gst/gstauto/doc.go Normal file
View File

@@ -0,0 +1,4 @@
// Package gstauto contains helper methods and objects for building pipelines that
// satisfy most use cases. It provides an abstraction over the lower-level building
// blocks provided in the gst package.
package gstauto

View File

@@ -0,0 +1,141 @@
package gstauto
import (
"errors"
"fmt"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// PipelineElement represents an `GstElement` in a `GstPipeline` when building a Pipeline with `NewPipelineFromConfig`.
// The Name should coorespond to a valid gstreamer plugin name. The data are additional
// fields to set on the element. If SinkCaps is non-nil, they are applied to the sink of this
// element.
type PipelineElement struct {
Name string
SinkCaps *gst.Caps
Data map[string]interface{}
}
// GetName returns the name to use when creating Elements from this configuration.
func (p *PipelineElement) GetName() string { return p.Name }
// PipelineConfig represents a list of elements and their configurations
// to be used with NewPipelineFromConfig.
type PipelineConfig struct {
Elements []*PipelineElement
}
// GetElementByName returns the Element configuration for the given name.
func (p *PipelineConfig) GetElementByName(name string) *PipelineElement {
for _, elem := range p.Elements {
if name == elem.GetName() {
return elem
}
}
return nil
}
// ElementNames returns a string slice of the names of all the plugins.
func (p *PipelineConfig) ElementNames() []string {
names := make([]string, 0)
for _, elem := range p.Elements {
names = append(names, elem.GetName())
}
return names
}
// pushPluginToTop pushes a plugin to the top of the list.
func (p *PipelineConfig) pushPluginToTop(elem *PipelineElement) {
newSlc := []*PipelineElement{elem}
newSlc = append(newSlc, p.Elements...)
p.Elements = newSlc
}
// Apply applies this configuration to the given Pipeline.
func (p *PipelineConfig) Apply(pipeline *gst.Pipeline) error {
// build all the elements
elementNames := p.ElementNames()
elements, err := gst.NewElementMany(elementNames...)
if err != nil {
return err
}
// iterate the element names and add them to the pipeline
for idx, name := range elementNames {
// get the current config and element
currentCfg := p.GetElementByName(name)
currentElem := elements[idx]
// Iterate any data with the plugin and set it on the element
for key, value := range currentCfg.Data {
if err := currentElem.Set(key, value); err != nil {
return err
}
}
// Add the element to the pipeline
if err := pipeline.Add(currentElem); err != nil {
return err
}
// If this is the first element continue
if idx == 0 {
continue
}
// get the last element in the chain
lastElemName := elementNames[idx-1]
lastElem := elements[idx-1]
lastCfg := p.GetElementByName(lastElemName)
if lastCfg == nil {
// this would never happen unless someone is messing with memory,
// but safety from panic
continue
}
// If there are sink caps on the last element, do a filtered link to this one and continue
if lastCfg.SinkCaps != nil {
if err := lastElem.LinkFiltered(currentElem, lastCfg.SinkCaps); err != nil {
return err
}
continue
}
// link the last element to this element
if err := lastElem.Link(currentElem); err != nil {
return err
}
}
return nil
}
// NewPipelineFromConfig builds a new pipeline from the given PipelineConfig. The plugins provided
// in the configuration will be linked in the order they are given.
func NewPipelineFromConfig(cfg *PipelineConfig) (*gst.Pipeline, error) {
if cfg.Elements == nil {
return nil, errors.New("Element cannot be empty in the configuration")
}
// create a new empty pipeline instance
pipeline, err := gst.NewPipeline("")
if err != nil {
return nil, err
}
// if any error happens while setting up the pipeline, immediately free it
defer func() {
if err != nil {
if destroyErr := pipeline.Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", err.Error())
}
}
}()
if err = cfg.Apply(pipeline); err != nil {
return nil, err
}
return pipeline, nil
}

View File

@@ -0,0 +1,32 @@
package gstauto
import (
"io"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// Pipeliner is a the base interface for structs extending the functionality of
// the Pipeline object. It provides a single method which returns the underlying
// Pipeline object.
type Pipeliner interface {
Pipeline() *gst.Pipeline
}
// ReadPipeliner is a Pipeliner that also implements a ReadCloser.
type ReadPipeliner interface {
Pipeliner
io.ReadCloser
}
// WritePipeliner is a Pipeliner that also implements a WriteCloser.
type WritePipeliner interface {
Pipeliner
io.WriteCloser
}
// ReadWritePipeliner is a Pipeliner that also implements a ReadWriteCloser.
type ReadWritePipeliner interface {
ReadPipeliner
WritePipeliner
}

View File

@@ -0,0 +1,79 @@
package gstauto
import (
"fmt"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// Empty assignment to ensure PipelineReader satisfies the ReadPipeliner interface.
var _ ReadPipeliner = &PipelineReader{}
// PipelineReader is the base struct to be used to implement ReadPipeliners.
type PipelineReader struct {
*readCloser
pipeline *gst.Pipeline
}
// NewPipelineReader returns a new PipelineReader with an empty pipeline. Use an empty name
// to have gstreamer auto-generate one. This method is intended for use in the construction
// of other interfaces.
func NewPipelineReader(name string) (*PipelineReader, error) {
pipeline, err := gst.NewPipeline(name)
if err != nil {
return nil, err
}
rCloser, err := newReadCloser()
if err != nil {
if closeErr := pipeline.Destroy(); closeErr != nil {
fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error())
}
return nil, err
}
return &PipelineReader{
readCloser: rCloser,
pipeline: pipeline,
}, nil
}
// NewPipelineReaderFromString returns a new PipelineReader with a pipeline populated
// by the provided gstreamer launch string. If you are looking to build a simple
// ReadPipeliner you probably want to use NewPipelineReaderSimpleFromString.
func NewPipelineReaderFromString(launchStr string) (*PipelineReader, error) {
pipeline, err := gst.NewPipelineFromString(launchStr)
if err != nil {
return nil, err
}
rCloser, err := newReadCloser()
if err != nil {
if closeErr := pipeline.Destroy(); closeErr != nil {
fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error())
}
return nil, err
}
return &PipelineReader{
readCloser: rCloser,
pipeline: pipeline,
}, nil
}
// Pipeline returns the underlying Pipeline instance for this pipeliner. It implements the
// Pipeliner interface.
func (r *PipelineReader) Pipeline() *gst.Pipeline { return r.pipeline }
// ReaderFd returns the file descriptor that can be written to for the read-buffer. This value
// is used when wanting to allow an underlying pipeline to write to the internal buffer (e.g. when using a fdsink).
func (r *PipelineReader) ReaderFd() uintptr { return r.readCloser.rWriter.Fd() }
// Close will stop and unref the underlying pipeline.
func (r *PipelineReader) Close() error {
if err := r.Pipeline().Destroy(); err != nil {
return err
}
return r.readCloser.Close()
}
// CloseAsync will close the underlying pipeline asynchronously. It is the caller's
// responsibility to call Unref on the pipeline and close buffers once it is no longer being used.
// This can be accomplished via calling a regular Close (which is idempotent).
func (r *PipelineReader) CloseAsync() error { return r.pipeline.SetState(gst.StateNull) }

View File

@@ -0,0 +1,85 @@
package gstauto
import (
"fmt"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// Empty assignment to ensure PipelineReadWriter satisfies the ReadWritePipeliner interface.
var _ ReadWritePipeliner = &PipelineReadWriter{}
// PipelineReadWriter is the base struct to be used to implement ReadWritePipeliners.
type PipelineReadWriter struct {
*readWriteCloser
pipeline *gst.Pipeline
}
// NewPipelineReadWriter returns a new PipelineReadWriter with an empty pipeline. Use an empty name
// to have gstreamer auto-generate one. This method is intended for use in the construction
// of other interfaces.
func NewPipelineReadWriter(name string) (*PipelineReadWriter, error) {
pipeline, err := gst.NewPipeline(name)
if err != nil {
return nil, err
}
rwCloser, err := newReadWriteCloser()
if err != nil {
if closeErr := pipeline.Destroy(); closeErr != nil {
fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error())
}
return nil, err
}
return &PipelineReadWriter{
readWriteCloser: rwCloser,
pipeline: pipeline,
}, nil
}
// NewPipelineReadWriterFromString returns a new PipelineReadWriter with a pipeline populated
// by the provided gstreamer launch string. If you are looking to build a simple
// ReadWritePipeliner you probably want to use NewPipelineReadWriterSimpleFromString.
func NewPipelineReadWriterFromString(launchStr string) (*PipelineReadWriter, error) {
pipeline, err := gst.NewPipelineFromString(launchStr)
if err != nil {
return nil, err
}
rwCloser, err := newReadWriteCloser()
if err != nil {
if closeErr := pipeline.Destroy(); closeErr != nil {
fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error())
}
return nil, err
}
return &PipelineReadWriter{
readWriteCloser: rwCloser,
pipeline: pipeline,
}, nil
}
// Pipeline returns the underlying Pipeline instance for this pipeliner. It implements the
// Pipeliner interface.
func (rw *PipelineReadWriter) Pipeline() *gst.Pipeline { return rw.pipeline }
// ReaderFd returns the file descriptor that can be written to for the read-buffer. This value
// is used when wanting to allow an underlying pipeline to write to the internal buffer
// (e.g. when using a fdsink).
func (rw *PipelineReadWriter) ReaderFd() uintptr { return rw.readWriteCloser.readCloser.rWriter.Fd() }
// WriterFd returns the file descriptor that can be used to read from the write-buffer. This value
// is used when wanting to allow an underlying pipeline the ability to read data written to the buffer
// (e.g. when using a fdsrc).
func (rw *PipelineReadWriter) WriterFd() uintptr { return rw.readWriteCloser.writeCloser.wReader.Fd() }
// Close will stop and unref the underlying pipeline and read/write buffers.
func (rw *PipelineReadWriter) Close() error {
if err := rw.Pipeline().Destroy(); err != nil {
return err
}
return rw.readWriteCloser.Close()
}
// CloseAsync will close the underlying pipeline asynchronously. It is the caller's
// responsibility to call Unref on the pipeline and close buffers once it is no longer being used.
// This can be accomplished via calling a regular Close (which is idempotent).
func (rw *PipelineReadWriter) CloseAsync() error { return rw.pipeline.SetState(gst.StateNull) }

View File

@@ -0,0 +1,57 @@
package gstauto
import (
"fmt"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// Blank assignment to make sure PipelinerSimple satisfies the Pipeliner interface.
var _ Pipeliner = &PipelinerSimple{}
// PipelinerSimple is a simple struct that implements the Pipeliner interface.
// It doesn't provide any additional read/write capabilities. Its primary intention
// is for pipelines where the caller does not wish to personally read or write from
// either end of the buffer.
type PipelinerSimple struct {
pipeline *gst.Pipeline
}
// Pipeline implements the Pipeliner interface.
func (s *PipelinerSimple) Pipeline() *gst.Pipeline { return s.pipeline }
// NewPipelinerSimple returns a new empty PipelinerSimple. Pass an empty string
// for name to use an auto-generated one.
func NewPipelinerSimple(name string) (*PipelinerSimple, error) {
pipeline, err := gst.NewPipeline(name)
if err != nil {
return nil, err
}
return &PipelinerSimple{pipeline: pipeline}, nil
}
// NewPipelinerSimpleFromString returns a new PipelinerSimpler from the given
// launch string.
func NewPipelinerSimpleFromString(launchStr string) (*PipelinerSimple, error) {
pipeline, err := gst.NewPipelineFromString(launchStr)
if err != nil {
return nil, err
}
return &PipelinerSimple{pipeline: pipeline}, nil
}
// NewPipelinerSimpleFromConfig returns a new PipelinerSimple from the given
// PipelineConfig.
func NewPipelinerSimpleFromConfig(cfg *PipelineConfig) (*PipelinerSimple, error) {
pipeline, err := gst.NewPipeline("")
if err != nil {
return nil, err
}
if err := cfg.Apply(pipeline); err != nil {
if destroyErr := pipeline.Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
return nil, err
}
return &PipelinerSimple{pipeline: pipeline}, nil
}

View File

@@ -0,0 +1,85 @@
package gstauto
import (
"errors"
"fmt"
"strings"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// PipelineReaderSimple implements a ReadPipeliner that configures gstreamer
// to write directly to the internal read-buffer via an fdsink.
type PipelineReaderSimple struct {
*PipelineReader
}
// NewPipelineReaderSimpleFromString returns a new PipelineReaderSimple populated from
// the given launch string. An fdsink is added to the end of the launch string and tied
// to the read buffer.
func NewPipelineReaderSimpleFromString(launchStr string) (*PipelineReaderSimple, error) {
pipelineReader, err := NewPipelineReaderFromString(addFdSinkToStr(launchStr))
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if destroyErr := pipelineReader.Pipeline().Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
}
}()
// Retrieve the sinks in the pipeline, most of the time there is just one
var sinks []*gst.Element
sinks, err = pipelineReader.Pipeline().GetSinkElements()
if err != nil {
return nil, err
}
// Fetch the fdsink and reconfigure it to point to the read buffer.
for _, sink := range sinks {
if strings.Contains(sink.Name(), "fdsink") {
if err = sink.Set("fd", pipelineReader.ReaderFd()); err != nil {
return nil, err
}
}
}
// Return the pipeline
return &PipelineReaderSimple{pipelineReader}, nil
}
// NewPipelineReaderSimpleFromConfig returns a new PipelineReaderSimple populated from
// the given launch config. An fdsink is added to the end of the launch config and tied
// to the read buffer.
func NewPipelineReaderSimpleFromConfig(cfg *PipelineConfig) (*PipelineReaderSimple, error) {
if cfg.Elements == nil {
return nil, errors.New("Elements cannot be nil in the config")
}
pipelineReader, err := NewPipelineReader("")
if err != nil {
return nil, err
}
cfg.Elements = append(cfg.Elements, &PipelineElement{
Name: "fdsink",
Data: map[string]interface{}{
"fd": pipelineReader.ReaderFd(),
},
})
if err := cfg.Apply(pipelineReader.Pipeline()); err != nil {
if destroyErr := pipelineReader.Pipeline().Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
return nil, err
}
return &PipelineReaderSimple{pipelineReader}, nil
}
func addFdSinkToStr(pstr string) string {
if pstr == "" {
return "fdsink"
}
return fmt.Sprintf("%s ! fdsink", pstr)
}

View File

@@ -0,0 +1,101 @@
package gstauto
import (
"errors"
"fmt"
"strings"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// PipelineReadWriterSimple implements a ReadWritePipeliner that configures gstreamer
// to read from the internal write-buffer via an fdsrc and write to the internal read-buffer
// via an fdsink.
type PipelineReadWriterSimple struct {
*PipelineReadWriter
}
// NewPipelineReadWriterSimpleFromString returns a new PipelineReadWriterSimple from
// the given launch string. An fdsrc listening on the write buffer and an fdsink to the read buffer
// are formatted into the provided string.
func NewPipelineReadWriterSimpleFromString(launchStr string) (*PipelineReadWriterSimple, error) {
pipelineReadWriter, err := NewPipelineReadWriterFromString(addFdSrcToStr(addFdSinkToStr(launchStr)))
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if destroyErr := pipelineReadWriter.Pipeline().Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
}
}()
// Retrieve the sinks in the pipeline, most of the time there is just one
var sinks []*gst.Element
sinks, err = pipelineReadWriter.Pipeline().GetSinkElements()
if err != nil {
return nil, err
}
// Fetch the fdsink and reconfigure it to point to the read buffer.
for _, sink := range sinks {
if strings.Contains(sink.Name(), "fdsink") {
if err = sink.Set("fd", pipelineReadWriter.ReaderFd()); err != nil {
return nil, err
}
}
}
// Retrieve the sources in the pipeline, most of the time there is just one
var sources []*gst.Element
sources, err = pipelineReadWriter.Pipeline().GetSourceElements()
if err != nil {
return nil, err
}
// Fetch the fdsrc and reconfigure it to point to the write buffer.
for _, source := range sources {
if strings.Contains(source.Name(), "fdsrc") {
if err = source.Set("fd", pipelineReadWriter.WriterFd()); err != nil {
return nil, err
}
}
}
// Return the pipeline
return &PipelineReadWriterSimple{pipelineReadWriter}, nil
}
// NewPipelineReadWriterSimpleFromConfig returns a new PipelineReadWriterSimple populated from
// the given launch config. An fdsrc is added to the start of the launch config and tied
// to the write buffer, and an fdsink is added to the end tied to the read-buffer.
func NewPipelineReadWriterSimpleFromConfig(cfg *PipelineConfig) (*PipelineReadWriterSimple, error) {
if cfg.Elements == nil {
return nil, errors.New("Elements cannot be nil in the config")
}
pipelineReadWriter, err := NewPipelineReadWriter("")
if err != nil {
return nil, err
}
cfg.pushPluginToTop(&PipelineElement{
Name: "fdsrc",
Data: map[string]interface{}{
"fd": pipelineReadWriter.WriterFd(),
},
})
cfg.Elements = append(cfg.Elements, &PipelineElement{
Name: "fdsink",
Data: map[string]interface{}{
"fd": pipelineReadWriter.ReaderFd(),
},
})
if err := cfg.Apply(pipelineReadWriter.Pipeline()); err != nil {
if destroyErr := pipelineReadWriter.Pipeline().Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
return nil, err
}
return &PipelineReadWriterSimple{pipelineReadWriter}, nil
}

View File

@@ -0,0 +1,85 @@
package gstauto
import (
"errors"
"fmt"
"strings"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// PipelineWriterSimple implements a WritePipeliner that configures gstreamer
// to read directly from the internal write-buffer via a fdsrc.
type PipelineWriterSimple struct {
*PipelineWriter
}
// NewPipelineWriterSimpleFromString returns a new PipelineWriterSimple populated from
// the given launch string. An fdsrc is added to the beginning of the string and tied to
// the write buffer.
func NewPipelineWriterSimpleFromString(launchStr string) (*PipelineWriterSimple, error) {
pipelineWriter, err := NewPipelineWriterFromString(addFdSrcToStr(launchStr))
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if destroyErr := pipelineWriter.Pipeline().Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
}
}()
// Retrieve the sources in the pipeline, most of the time there is just one
var sources []*gst.Element
sources, err = pipelineWriter.Pipeline().GetSourceElements()
if err != nil {
return nil, err
}
// Fetch the fdsrc and reconfigure it to point to the write buffer.
for _, source := range sources {
if strings.Contains(source.Name(), "fdsrc") {
if err = source.Set("fd", pipelineWriter.WriterFd()); err != nil {
return nil, err
}
}
}
// Return the pipeline
return &PipelineWriterSimple{pipelineWriter}, nil
}
// NewPipelineWriterSimpleFromConfig returns a new PipelineWriterSimple populated from
// the given launch config. An fdsrc is added to the start of the launch config and tied
// to the write buffer.
func NewPipelineWriterSimpleFromConfig(cfg *PipelineConfig) (*PipelineWriterSimple, error) {
if cfg.Elements == nil {
return nil, errors.New("Elements cannot be nil in the config")
}
pipelineWriter, err := NewPipelineWriter("")
if err != nil {
return nil, err
}
cfg.pushPluginToTop(&PipelineElement{
Name: "fdsrc",
Data: map[string]interface{}{
"fd": pipelineWriter.WriterFd(),
},
})
if err := cfg.Apply(pipelineWriter.Pipeline()); err != nil {
if destroyErr := pipelineWriter.Pipeline().Destroy(); destroyErr != nil {
fmt.Println("[go-gst] Error while destroying failed pipeline instance:", destroyErr.Error())
}
return nil, err
}
return &PipelineWriterSimple{pipelineWriter}, nil
}
func addFdSrcToStr(pstr string) string {
if pstr == "" {
return "fdsrc"
}
return fmt.Sprintf("fdsink ! %s", pstr)
}

View File

@@ -0,0 +1,80 @@
package gstauto
import (
"fmt"
"github.com/tinyzimmer/go-gst-launch/gst"
)
// Empty assignment to ensure PipelineWriter satisfies the WritePipeliner interface.
var _ WritePipeliner = &PipelineWriter{}
// PipelineWriter is the base struct to be used to implement WritePipeliners.
type PipelineWriter struct {
*writeCloser
pipeline *gst.Pipeline
}
// NewPipelineWriter returns a new PipelineWriter with an empty pipeline. Use an empty name
// to have gstreamer auto-generate one. This method is intended for use in the construction
// of other interfaces.
func NewPipelineWriter(name string) (*PipelineWriter, error) {
pipeline, err := gst.NewPipeline(name)
if err != nil {
return nil, err
}
wCloser, err := newWriteCloser()
if err != nil {
if closeErr := pipeline.Destroy(); closeErr != nil {
fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error())
}
return nil, err
}
return &PipelineWriter{
writeCloser: wCloser,
pipeline: pipeline,
}, nil
}
// NewPipelineWriterFromString returns a new PipelineWriter with a pipeline populated
// by the provided gstreamer launch string. If you are looking to build a simple
// WritePipeliner you probably want to use NewPipelineWriterSimpleFromString.
func NewPipelineWriterFromString(launchStr string) (*PipelineWriter, error) {
pipeline, err := gst.NewPipelineFromString(launchStr)
if err != nil {
return nil, err
}
wCloser, err := newWriteCloser()
if err != nil {
if closeErr := pipeline.Destroy(); closeErr != nil {
fmt.Println("[gst-auto] Failed to destroy errored pipeline:", closeErr.Error())
}
return nil, err
}
return &PipelineWriter{
writeCloser: wCloser,
pipeline: pipeline,
}, nil
}
// Pipeline returns the underlying Pipeline instance for this pipeliner. It implements the
// Pipeliner interface.
func (w *PipelineWriter) Pipeline() *gst.Pipeline { return w.pipeline }
// WriterFd returns the file descriptor that can be used to read from the write-buffer. This value
// is used when wanting to allow an underlying pipeline the ability to read data written to
// the buffer (e.g. when using a fdsrc).
func (w *PipelineWriter) WriterFd() uintptr { return w.writeCloser.wReader.Fd() }
// Close will stop and unref the underlying pipeline.
func (w *PipelineWriter) Close() error {
if err := w.Pipeline().Destroy(); err != nil {
return err
}
return w.writeCloser.Close()
}
// CloseAsync will close the underlying pipeline asynchronously. It is the caller's
// responsibility to call Unref on the pipeline and close buffers once it is no longer being used.
// This can be accomplished via calling a regular Close (which is idempotent).
func (w *PipelineWriter) CloseAsync() error { return w.pipeline.SetState(gst.StateNull) }