mirror of
https://github.com/go-gst/go-gst.git
synced 2025-10-05 07:56:51 +08:00
major move of all glib specific bindings and extendable objects from go runtime to go-glib package
This commit is contained in:
@@ -59,13 +59,13 @@ var CAT = gst.NewDebugCategory(
|
||||
// This element only has a single property, the location of the file to write to.
|
||||
// When getting and setting properties later on, you will reference them by their index in
|
||||
// this list.
|
||||
var properties = []*gst.ParamSpec{
|
||||
gst.NewStringParam(
|
||||
var properties = []*glib.ParamSpec{
|
||||
glib.NewStringParam(
|
||||
"location", // The name of the parameter
|
||||
"File Location", // The long name for the parameter
|
||||
"Location to write the file to", // A blurb about the parameter
|
||||
nil, // A default value for the parameter
|
||||
gst.ParameterReadWrite, // Flags for the parameter
|
||||
glib.ParameterReadWrite, // Flags for the parameter
|
||||
),
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ type settings struct {
|
||||
location string
|
||||
}
|
||||
|
||||
// Finally a structure is defined that implements (at a minimum) the gst.GoElement interface.
|
||||
// Finally a structure is defined that implements (at a minimum) the glib.GoObject interface.
|
||||
// It is possible to signal to the bindings to inherit from other classes or implement other
|
||||
// interfaces via the registration and TypeInit processes.
|
||||
type fileSink struct {
|
||||
@@ -109,10 +109,9 @@ func (f *fileSink) setLocation(path string) error {
|
||||
// element and its capabilities with the type system. These are the minimum methods that
|
||||
// should be implemented by an element.
|
||||
|
||||
// Every element needs to provide its own constructor that returns an initialized
|
||||
// gst.GoElement implementation. Here we simply create a new fileSink with zeroed settings
|
||||
// and state objects.
|
||||
func (f *fileSink) New() gst.GoElement {
|
||||
// Every element needs to provide its own constructor that returns an initialized glib.GoObjectSubclass
|
||||
// implementation. Here we simply create a new fileSink with zeroed settings and state objects.
|
||||
func (f *fileSink) New() glib.GoObjectSubclass {
|
||||
CAT.Log(gst.LevelLog, "Initializing new fileSink object")
|
||||
return &fileSink{
|
||||
settings: &settings{},
|
||||
@@ -122,29 +121,30 @@ func (f *fileSink) New() gst.GoElement {
|
||||
|
||||
// The TypeInit method should register any additional interfaces provided by the element.
|
||||
// In this example we signal to the type system that we also implement the GstURIHandler interface.
|
||||
func (f *fileSink) TypeInit(instance *gst.TypeInstance) {
|
||||
func (f *fileSink) TypeInit(instance *glib.TypeInstance) {
|
||||
CAT.Log(gst.LevelLog, "Adding URIHandler interface to type")
|
||||
instance.AddInterface(gst.InterfaceURIHandler)
|
||||
}
|
||||
|
||||
// The ClassInit method should specify the metadata for this element and add any pad templates
|
||||
// and properties.
|
||||
func (f *fileSink) ClassInit(klass *gst.ElementClass) {
|
||||
func (f *fileSink) ClassInit(klass *glib.ObjectClass) {
|
||||
CAT.Log(gst.LevelLog, "Initializing gofilesink class")
|
||||
klass.SetMetadata(
|
||||
class := gst.ToElementClass(klass)
|
||||
class.SetMetadata(
|
||||
"File Sink",
|
||||
"Sink/File",
|
||||
"Write stream to a file",
|
||||
"Avi Zimmerman <avi.zimmerman@gmail.com>",
|
||||
)
|
||||
CAT.Log(gst.LevelLog, "Adding sink pad template and properties to class")
|
||||
klass.AddPadTemplate(gst.NewPadTemplate(
|
||||
class.AddPadTemplate(gst.NewPadTemplate(
|
||||
"sink",
|
||||
gst.PadDirectionSink,
|
||||
gst.PadPresenceAlways,
|
||||
gst.NewAnyCaps(),
|
||||
))
|
||||
klass.InstallProperties(properties)
|
||||
class.InstallProperties(properties)
|
||||
}
|
||||
|
||||
// Object implementations are used during the initialization of an element. The
|
||||
|
@@ -59,13 +59,13 @@ var CAT = gst.NewDebugCategory(
|
||||
// This element only has a single property, the location of the file to read from.
|
||||
// When getting and setting properties later on, you will reference them by their index in
|
||||
// this list.
|
||||
var properties = []*gst.ParamSpec{
|
||||
gst.NewStringParam(
|
||||
var properties = []*glib.ParamSpec{
|
||||
glib.NewStringParam(
|
||||
"location", // The name of the parameter
|
||||
"File Location", // The long name for the parameter
|
||||
"Location of the file to read from", // A blurb about the parameter
|
||||
nil, // A default value for the parameter
|
||||
gst.ParameterReadWrite, // Flags for the parameter
|
||||
glib.ParameterReadWrite, // Flags for the parameter
|
||||
),
|
||||
}
|
||||
|
||||
@@ -114,9 +114,8 @@ func (f *fileSrc) setLocation(path string) error {
|
||||
// should be implemented by an element.
|
||||
|
||||
// Every element needs to provide its own constructor that returns an initialized
|
||||
// gst.GoElement implementation. Here we simply create a new fileSrc with zeroed settings
|
||||
// and state objects.
|
||||
func (f *fileSrc) New() gst.GoElement {
|
||||
// glib.GoObjectSubclass and state objects.
|
||||
func (f *fileSrc) New() glib.GoObjectSubclass {
|
||||
CAT.Log(gst.LevelLog, "Initializing new fileSrc object")
|
||||
return &fileSrc{
|
||||
settings: &settings{},
|
||||
@@ -126,29 +125,30 @@ func (f *fileSrc) New() gst.GoElement {
|
||||
|
||||
// The TypeInit method should register any additional interfaces provided by the element.
|
||||
// In this example we signal to the type system that we also implement the GstURIHandler interface.
|
||||
func (f *fileSrc) TypeInit(instance *gst.TypeInstance) {
|
||||
func (f *fileSrc) TypeInit(instance *glib.TypeInstance) {
|
||||
CAT.Log(gst.LevelLog, "Adding URIHandler interface to type")
|
||||
instance.AddInterface(gst.InterfaceURIHandler)
|
||||
}
|
||||
|
||||
// The ClassInit method should specify the metadata for this element and add any pad templates
|
||||
// and properties.
|
||||
func (f *fileSrc) ClassInit(klass *gst.ElementClass) {
|
||||
func (f *fileSrc) ClassInit(klass *glib.ObjectClass) {
|
||||
CAT.Log(gst.LevelLog, "Initializing gofilesrc class")
|
||||
klass.SetMetadata(
|
||||
class := gst.ToElementClass(klass)
|
||||
class.SetMetadata(
|
||||
"File Source",
|
||||
"Source/File",
|
||||
"Read stream from a file",
|
||||
"Avi Zimmerman <avi.zimmerman@gmail.com>",
|
||||
)
|
||||
CAT.Log(gst.LevelLog, "Adding src pad template and properties to class")
|
||||
klass.AddPadTemplate(gst.NewPadTemplate(
|
||||
class.AddPadTemplate(gst.NewPadTemplate(
|
||||
"src",
|
||||
gst.PadDirectionSource,
|
||||
gst.PadPresenceAlways,
|
||||
gst.NewAnyCaps(),
|
||||
))
|
||||
klass.InstallProperties(properties)
|
||||
class.InstallProperties(properties)
|
||||
}
|
||||
|
||||
// Object implementations are used during the initialization of an element. The
|
||||
|
@@ -1,36 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
const (
|
||||
accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID"
|
||||
secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultEndpoint = "play.min.io"
|
||||
defaultUseTLS = true
|
||||
defaultRegion = "us-east-1"
|
||||
)
|
||||
|
||||
type settings struct {
|
||||
endpoint string
|
||||
useTLS bool
|
||||
region string
|
||||
bucket string
|
||||
key string
|
||||
accessKeyID string
|
||||
secretAccessKey string
|
||||
}
|
||||
|
||||
func defaultSettings() *settings {
|
||||
return &settings{
|
||||
endpoint: defaultEndpoint,
|
||||
useTLS: defaultUseTLS,
|
||||
region: defaultRegion,
|
||||
accessKeyID: os.Getenv(accessKeyIDEnvVar),
|
||||
secretAccessKey: os.Getenv(secretAccessKeyEnvVar),
|
||||
}
|
||||
}
|
@@ -1,267 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
minio "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
||||
"github.com/tinyzimmer/go-glib/glib"
|
||||
"github.com/tinyzimmer/go-gst/gst"
|
||||
"github.com/tinyzimmer/go-gst/gst/base"
|
||||
)
|
||||
|
||||
var sinkCAT = gst.NewDebugCategory(
|
||||
"miniosink",
|
||||
gst.DebugColorNone,
|
||||
"MinIOSink Element",
|
||||
)
|
||||
|
||||
var sinkProperties = []*gst.ParamSpec{
|
||||
gst.NewStringParam(
|
||||
"endpoint",
|
||||
"S3 API Endpoint",
|
||||
"The endpoint for the S3 API server",
|
||||
&defaultEndpoint,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewBoolParam(
|
||||
"use-tls",
|
||||
"Use TLS",
|
||||
"Use HTTPS for API requests",
|
||||
defaultUseTLS,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"region",
|
||||
"Bucket region",
|
||||
"The region where the bucket is",
|
||||
&defaultRegion,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"bucket",
|
||||
"Bucket name",
|
||||
"The name of the MinIO bucket",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"key",
|
||||
"Object key",
|
||||
"The key of the object inside the bucket",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"access-key-id",
|
||||
"Access Key ID",
|
||||
"The access key ID to use for authentication",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"secret-access-key",
|
||||
"Secret Access Key",
|
||||
"The secret access key to use for authentication",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
}
|
||||
|
||||
type minioSink struct {
|
||||
settings *settings
|
||||
state *sinkstate
|
||||
}
|
||||
|
||||
type sinkstate struct {
|
||||
started bool
|
||||
rPipe io.ReadCloser
|
||||
wPipe io.WriteCloser
|
||||
doneChan chan struct{}
|
||||
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
func (m *minioSink) New() gst.GoElement {
|
||||
srcCAT.Log(gst.LevelLog, "Creating new minioSink object")
|
||||
return &minioSink{
|
||||
settings: defaultSettings(),
|
||||
state: &sinkstate{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *minioSink) TypeInit(*gst.TypeInstance) {}
|
||||
|
||||
func (m *minioSink) ClassInit(klass *gst.ElementClass) {
|
||||
sinkCAT.Log(gst.LevelLog, "Initializing miniosink class")
|
||||
klass.SetMetadata(
|
||||
"MinIO Sink",
|
||||
"Sink/File",
|
||||
"Write stream to a MinIO object",
|
||||
"Avi Zimmerman <avi.zimmerman@gmail.com>",
|
||||
)
|
||||
sinkCAT.Log(gst.LevelLog, "Adding sink pad template and properties to class")
|
||||
klass.AddPadTemplate(gst.NewPadTemplate(
|
||||
"sink",
|
||||
gst.PadDirectionSink,
|
||||
gst.PadPresenceAlways,
|
||||
gst.NewAnyCaps(),
|
||||
))
|
||||
klass.InstallProperties(sinkProperties)
|
||||
}
|
||||
|
||||
func (m *minioSink) SetProperty(self *gst.Object, id uint, value *glib.Value) {
|
||||
prop := sinkProperties[id]
|
||||
|
||||
val, err := value.GoValue()
|
||||
if err != nil {
|
||||
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
|
||||
fmt.Sprintf("Could not coerce %v to go value", value), err.Error())
|
||||
}
|
||||
|
||||
switch prop.Name() {
|
||||
case "endpoint":
|
||||
m.settings.endpoint = val.(string)
|
||||
case "use-tls":
|
||||
m.settings.useTLS = val.(bool)
|
||||
case "region":
|
||||
m.settings.region = val.(string)
|
||||
case "bucket":
|
||||
m.settings.bucket = val.(string)
|
||||
case "key":
|
||||
m.settings.key = val.(string)
|
||||
case "access-key-id":
|
||||
m.settings.accessKeyID = val.(string)
|
||||
case "secret-access-key":
|
||||
m.settings.secretAccessKey = val.(string)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (m *minioSink) GetProperty(self *gst.Object, id uint) *glib.Value {
|
||||
prop := sinkProperties[id]
|
||||
|
||||
var localVal interface{}
|
||||
|
||||
switch prop.Name() {
|
||||
case "endpoint":
|
||||
localVal = m.settings.endpoint
|
||||
case "use-tls":
|
||||
localVal = m.settings.useTLS
|
||||
case "region":
|
||||
localVal = m.settings.region
|
||||
case "bucket":
|
||||
localVal = m.settings.bucket
|
||||
case "key":
|
||||
localVal = m.settings.key
|
||||
case "access-key-id":
|
||||
localVal = m.settings.accessKeyID
|
||||
case "secret-access-key":
|
||||
localVal = m.settings.secretAccessKey
|
||||
|
||||
default:
|
||||
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
|
||||
fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "")
|
||||
return nil
|
||||
}
|
||||
|
||||
val, err := glib.GValue(localVal)
|
||||
if err != nil {
|
||||
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
|
||||
fmt.Sprintf("Could not convert %v to GValue", localVal),
|
||||
err.Error(),
|
||||
)
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func (m *minioSink) Start(self *base.GstBaseSink) bool {
|
||||
if m.state.started {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
|
||||
"MinIOSink is already started", "")
|
||||
return false
|
||||
}
|
||||
|
||||
if m.settings.bucket == "" {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
|
||||
"No bucket configured on the miniosink", "")
|
||||
return false
|
||||
}
|
||||
|
||||
if m.settings.key == "" {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
|
||||
"No bucket configured on the miniosink", "")
|
||||
return false
|
||||
}
|
||||
|
||||
m.state.doneChan = make(chan struct{})
|
||||
|
||||
client, err := minio.New(m.settings.endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(m.settings.accessKeyID, m.settings.secretAccessKey, ""),
|
||||
Secure: m.settings.useTLS,
|
||||
Region: m.settings.region,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
|
||||
fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
m.state.rPipe, m.state.wPipe = io.Pipe()
|
||||
|
||||
go func() {
|
||||
self.Log(sinkCAT, gst.LevelInfo,
|
||||
fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key),
|
||||
)
|
||||
if _, err := client.PutObject(context.Background(),
|
||||
m.settings.bucket, m.settings.key,
|
||||
m.state.rPipe, -1,
|
||||
minio.PutObjectOptions{
|
||||
ContentType: "application/octet-stream",
|
||||
}); err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
|
||||
fmt.Sprintf("Error during PutObject call to %s/%s", m.settings.bucket, m.settings.key), err.Error())
|
||||
}
|
||||
m.state.doneChan <- struct{}{}
|
||||
}()
|
||||
|
||||
m.state.started = true
|
||||
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started")
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *minioSink) Stop(self *base.GstBaseSink) bool {
|
||||
if !m.state.started {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "")
|
||||
return false
|
||||
}
|
||||
if err := m.state.wPipe.Close(); err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Failed to close the write pipe", err.Error())
|
||||
return false
|
||||
}
|
||||
self.Log(sinkCAT, gst.LevelInfo, "Waiting for PutObject operation to complete")
|
||||
<-m.state.doneChan
|
||||
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped")
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn {
|
||||
if !m.state.started {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "")
|
||||
return gst.FlowError
|
||||
}
|
||||
|
||||
self.Log(sinkCAT, gst.LevelLog, fmt.Sprintf("Rendering buffer %v", buffer))
|
||||
if _, err := io.Copy(m.state.wPipe, buffer.Reader()); err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Error copying buffer to write pipe", err.Error())
|
||||
return gst.FlowError
|
||||
}
|
||||
|
||||
return gst.FlowOK
|
||||
}
|
@@ -1,324 +0,0 @@
|
||||
// This example demonstrates a src element that reads from objects in a minio bucket.
|
||||
// Since minio implements the S3 API this plugin could also be used for S3 buckets by
|
||||
// setting the correct endpoints and credentials.
|
||||
//
|
||||
// By default this plugin will use the credentials set in the environment at MINIO_ACCESS_KEY_ID
|
||||
// and MINIO_SECRET_ACCESS_KEY however these can also be set on the element directly.
|
||||
//
|
||||
//
|
||||
// In order to build the plugin for use by GStreamer, you can do the following:
|
||||
//
|
||||
// $ go build -o libgstminiosrc.so -buildmode c-shared .
|
||||
//
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
minio "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
||||
"github.com/tinyzimmer/go-glib/glib"
|
||||
"github.com/tinyzimmer/go-gst/gst"
|
||||
"github.com/tinyzimmer/go-gst/gst/base"
|
||||
)
|
||||
|
||||
var srcCAT = gst.NewDebugCategory(
|
||||
"miniosrc",
|
||||
gst.DebugColorNone,
|
||||
"MinIOSrc Element",
|
||||
)
|
||||
|
||||
var srcProperties = []*gst.ParamSpec{
|
||||
gst.NewStringParam(
|
||||
"endpoint",
|
||||
"S3 API Endpoint",
|
||||
"The endpoint for the S3 API server",
|
||||
&defaultEndpoint,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewBoolParam(
|
||||
"use-tls",
|
||||
"Use TLS",
|
||||
"Use HTTPS for API requests",
|
||||
defaultUseTLS,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"region",
|
||||
"Bucket region",
|
||||
"The region where the bucket is",
|
||||
&defaultRegion,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"bucket",
|
||||
"Bucket name",
|
||||
"The name of the MinIO bucket",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"key",
|
||||
"Object key",
|
||||
"The key of the object inside the bucket",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"access-key-id",
|
||||
"Access Key ID",
|
||||
"The access key ID to use for authentication",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
gst.NewStringParam(
|
||||
"secret-access-key",
|
||||
"Secret Access Key",
|
||||
"The secret access key to use for authentication",
|
||||
nil,
|
||||
gst.ParameterReadWrite,
|
||||
),
|
||||
}
|
||||
|
||||
type minioSrc struct {
|
||||
settings *settings
|
||||
state *srcstate
|
||||
}
|
||||
|
||||
type srcstate struct {
|
||||
started bool
|
||||
object *minio.Object
|
||||
objInfo minio.ObjectInfo
|
||||
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
func (m *minioSrc) New() gst.GoElement {
|
||||
srcCAT.Log(gst.LevelLog, "Creating new minioSrc object")
|
||||
return &minioSrc{
|
||||
settings: defaultSettings(),
|
||||
state: &srcstate{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *minioSrc) TypeInit(*gst.TypeInstance) {}
|
||||
|
||||
func (m *minioSrc) ClassInit(klass *gst.ElementClass) {
|
||||
srcCAT.Log(gst.LevelLog, "Initializing miniosrc class")
|
||||
klass.SetMetadata(
|
||||
"MinIO Source",
|
||||
"Source/File",
|
||||
"Read stream from a MinIO object",
|
||||
"Avi Zimmerman <avi.zimmerman@gmail.com>",
|
||||
)
|
||||
srcCAT.Log(gst.LevelLog, "Adding src pad template and properties to class")
|
||||
klass.AddPadTemplate(gst.NewPadTemplate(
|
||||
"src",
|
||||
gst.PadDirectionSource,
|
||||
gst.PadPresenceAlways,
|
||||
gst.NewAnyCaps(),
|
||||
))
|
||||
klass.InstallProperties(srcProperties)
|
||||
}
|
||||
|
||||
func (m *minioSrc) SetProperty(self *gst.Object, id uint, value *glib.Value) {
|
||||
prop := srcProperties[id]
|
||||
|
||||
val, err := value.GoValue()
|
||||
if err != nil {
|
||||
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
|
||||
fmt.Sprintf("Could not coerce %v to go value", value), err.Error())
|
||||
}
|
||||
|
||||
switch prop.Name() {
|
||||
case "endpoint":
|
||||
m.settings.endpoint = val.(string)
|
||||
case "use-tls":
|
||||
m.settings.useTLS = val.(bool)
|
||||
case "region":
|
||||
m.settings.region = val.(string)
|
||||
case "bucket":
|
||||
m.settings.bucket = val.(string)
|
||||
case "key":
|
||||
m.settings.key = val.(string)
|
||||
case "access-key-id":
|
||||
m.settings.accessKeyID = val.(string)
|
||||
case "secret-access-key":
|
||||
m.settings.secretAccessKey = val.(string)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value {
|
||||
prop := srcProperties[id]
|
||||
|
||||
var localVal interface{}
|
||||
|
||||
switch prop.Name() {
|
||||
case "endpoint":
|
||||
localVal = m.settings.endpoint
|
||||
case "use-tls":
|
||||
localVal = m.settings.useTLS
|
||||
case "region":
|
||||
localVal = m.settings.region
|
||||
case "bucket":
|
||||
localVal = m.settings.bucket
|
||||
case "key":
|
||||
localVal = m.settings.key
|
||||
case "access-key-id":
|
||||
localVal = m.settings.accessKeyID
|
||||
case "secret-access-key":
|
||||
localVal = m.settings.secretAccessKey
|
||||
|
||||
default:
|
||||
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
|
||||
fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "")
|
||||
return nil
|
||||
}
|
||||
|
||||
val, err := glib.GValue(localVal)
|
||||
if err != nil {
|
||||
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
|
||||
fmt.Sprintf("Could not convert %v to GValue", localVal),
|
||||
err.Error(),
|
||||
)
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func (m *minioSrc) Constructed(self *gst.Object) {
|
||||
self.Log(srcCAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes")
|
||||
base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes)
|
||||
}
|
||||
|
||||
func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return true }
|
||||
|
||||
func (m *minioSrc) GetSize(self *base.GstBaseSrc) (bool, int64) {
|
||||
if !m.state.started {
|
||||
return false, 0
|
||||
}
|
||||
return true, m.state.objInfo.Size
|
||||
}
|
||||
|
||||
func (m *minioSrc) Start(self *base.GstBaseSrc) bool {
|
||||
m.state.mux.Lock()
|
||||
|
||||
if m.state.started {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "MinIOSrc is already started", "")
|
||||
return false
|
||||
}
|
||||
|
||||
if m.settings.bucket == "" {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "No source bucket defined", "")
|
||||
return false
|
||||
}
|
||||
|
||||
if m.settings.key == "" {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, "No object key defined", "")
|
||||
return false
|
||||
}
|
||||
|
||||
client, err := minio.New(m.settings.endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(m.settings.accessKeyID, m.settings.secretAccessKey, ""),
|
||||
Secure: m.settings.useTLS,
|
||||
Region: m.settings.region,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
|
||||
fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint))
|
||||
m.state.object, err = client.GetObject(context.Background(), m.settings.bucket, m.settings.key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
|
||||
fmt.Sprintf("Failed to retrieve object %q from bucket %q", m.settings.key, m.settings.bucket), err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
self.Log(srcCAT, gst.LevelInfo, "Getting HEAD for object")
|
||||
m.state.objInfo, err = m.state.object.Stat()
|
||||
if err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
|
||||
fmt.Sprintf("Failed to stat object %q in bucket %q: %s", m.settings.key, m.settings.bucket, err.Error()), "")
|
||||
return false
|
||||
}
|
||||
self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo))
|
||||
|
||||
m.state.started = true
|
||||
|
||||
m.state.mux.Unlock()
|
||||
|
||||
self.StartComplete(gst.FlowOK)
|
||||
|
||||
self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has started")
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *minioSrc) Stop(self *base.GstBaseSrc) bool {
|
||||
m.state.mux.Lock()
|
||||
defer m.state.mux.Unlock()
|
||||
|
||||
if !m.state.started {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSrc is not started", "")
|
||||
return false
|
||||
}
|
||||
|
||||
if err := m.state.object.Close(); err != nil {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, "Failed to close the bucket object", err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
m.state.object = nil
|
||||
m.state.started = false
|
||||
|
||||
self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has stopped")
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer *gst.Buffer) gst.FlowReturn {
|
||||
|
||||
if !m.state.started || m.state.object == nil {
|
||||
self.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "MinIOSrc is not started yet", "")
|
||||
return gst.FlowError
|
||||
}
|
||||
|
||||
self.Log(srcCAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size))
|
||||
|
||||
m.state.mux.Lock()
|
||||
defer m.state.mux.Unlock()
|
||||
|
||||
data := make([]byte, size)
|
||||
read, err := m.state.object.ReadAt(data, int64(offset))
|
||||
if err != nil && err != io.EOF {
|
||||
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorRead,
|
||||
fmt.Sprintf("Failed to read %d bytes from object at offset %d", size, offset), err.Error())
|
||||
return gst.FlowError
|
||||
}
|
||||
|
||||
if read < int(size) {
|
||||
self.Log(srcCAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read))
|
||||
trim := make([]byte, read)
|
||||
copy(trim, data)
|
||||
data = trim
|
||||
}
|
||||
|
||||
bufmap := buffer.Map(gst.MapWrite)
|
||||
if bufmap == nil {
|
||||
self.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, "Failed to map buffer", "")
|
||||
return gst.FlowError
|
||||
}
|
||||
defer buffer.Unmap()
|
||||
|
||||
bufmap.WriteData(data)
|
||||
buffer.SetSize(int64(read))
|
||||
|
||||
return gst.FlowOK
|
||||
}
|
@@ -1,61 +0,0 @@
|
||||
package main
|
||||
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/tinyzimmer/go-gst/gst"
|
||||
"github.com/tinyzimmer/go-gst/gst/base"
|
||||
)
|
||||
|
||||
// The metadata for this plugin
|
||||
var pluginMeta = &gst.PluginMetadata{
|
||||
MajorVersion: gst.VersionMajor,
|
||||
MinorVersion: gst.VersionMinor,
|
||||
Name: "miniosrc",
|
||||
Description: "GStreamer plugins for reading and writing from Minio",
|
||||
Version: "v0.0.1",
|
||||
License: gst.LicenseLGPL,
|
||||
Source: "go-gst",
|
||||
Package: "examples",
|
||||
Origin: "https://github.com/tinyzimmer/go-gst",
|
||||
ReleaseDate: "2021-01-11",
|
||||
// The init function is called to register elements provided by the plugin.
|
||||
Init: func(plugin *gst.Plugin) bool {
|
||||
if ok := gst.RegisterElement(
|
||||
plugin,
|
||||
// The name of the element
|
||||
"miniosrc",
|
||||
// The rank of the element
|
||||
gst.RankNone,
|
||||
// The GoElement implementation for the element
|
||||
&minioSrc{},
|
||||
// The base subclass this element extends
|
||||
base.ExtendsBaseSrc,
|
||||
); !ok {
|
||||
return ok
|
||||
}
|
||||
|
||||
if ok := gst.RegisterElement(
|
||||
plugin,
|
||||
// The name of the element
|
||||
"miniosink",
|
||||
// The rank of the element
|
||||
gst.RankNone,
|
||||
// The GoElement implementation for the element
|
||||
&minioSink{},
|
||||
// The base subclass this element extends
|
||||
base.ExtendsBaseSink,
|
||||
); !ok {
|
||||
return ok
|
||||
}
|
||||
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
func main() {}
|
||||
|
||||
//export gst_plugin_minio_get_desc
|
||||
func gst_plugin_minio_get_desc() unsafe.Pointer { return pluginMeta.Export() }
|
@@ -55,29 +55,29 @@ var CAT = gst.NewDebugCategory(
|
||||
"WebsocketSrc Element",
|
||||
)
|
||||
|
||||
var properties = []*gst.ParamSpec{
|
||||
gst.NewStringParam(
|
||||
var properties = []*glib.ParamSpec{
|
||||
glib.NewStringParam(
|
||||
"address",
|
||||
"Server Address",
|
||||
"The address to bind the server to",
|
||||
&DefaultAddress,
|
||||
gst.ParameterReadWrite,
|
||||
glib.ParameterReadWrite,
|
||||
),
|
||||
gst.NewIntParam(
|
||||
glib.NewIntParam(
|
||||
"port",
|
||||
"Server Port",
|
||||
"The port to bind the server to",
|
||||
1024, 65535,
|
||||
DefaultPort,
|
||||
gst.ParameterReadWrite,
|
||||
glib.ParameterReadWrite,
|
||||
),
|
||||
// not implemented yet
|
||||
gst.NewBoolParam(
|
||||
glib.NewBoolParam(
|
||||
"retrieve-remote-addr",
|
||||
"Retrieve Remote Address",
|
||||
"Include the remote client's address in the buffer metadata",
|
||||
DefaultRetrieveRemoteAddr,
|
||||
gst.ParameterReadWrite,
|
||||
glib.ParameterReadWrite,
|
||||
),
|
||||
}
|
||||
|
||||
@@ -398,29 +398,30 @@ func (w *websocketSrc) setupSrcPad(elem *gst.Element) {
|
||||
|
||||
// * ObjectSubclass * //
|
||||
|
||||
func (w *websocketSrc) New() gst.GoElement {
|
||||
func (w *websocketSrc) New() glib.GoObjectSubclass {
|
||||
return &websocketSrc{
|
||||
settings: defaultSettings(),
|
||||
state: &state{},
|
||||
}
|
||||
}
|
||||
|
||||
func (w *websocketSrc) TypeInit(instance *gst.TypeInstance) {}
|
||||
func (w *websocketSrc) TypeInit(instance *glib.TypeInstance) {}
|
||||
|
||||
func (w *websocketSrc) ClassInit(klass *gst.ElementClass) {
|
||||
klass.SetMetadata(
|
||||
func (w *websocketSrc) ClassInit(klass *glib.ObjectClass) {
|
||||
class := gst.ToElementClass(klass)
|
||||
class.SetMetadata(
|
||||
"Websocket Src",
|
||||
"Src/Websocket",
|
||||
"Write stream from a connection over a websocket server",
|
||||
"Avi Zimmerman <avi.zimmerman@gmail.com>",
|
||||
)
|
||||
klass.AddPadTemplate(gst.NewPadTemplate(
|
||||
class.AddPadTemplate(gst.NewPadTemplate(
|
||||
"src",
|
||||
gst.PadDirectionSource,
|
||||
gst.PadPresenceAlways,
|
||||
gst.NewAnyCaps(),
|
||||
))
|
||||
klass.InstallProperties(properties)
|
||||
class.InstallProperties(properties)
|
||||
}
|
||||
|
||||
// * Object * //
|
||||
|
Reference in New Issue
Block a user