minio/s3 sink plugin

This commit is contained in:
Avi Zimmerman
2021-01-12 08:31:29 +02:00
parent b603bef498
commit 1ebccda0a5
4 changed files with 388 additions and 91 deletions

View File

@@ -0,0 +1,36 @@
package main
import (
"os"
)
const (
accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID"
secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY"
)
var (
defaultEndpoint = "play.min.io"
defaultUseTLS = true
defaultRegion = "us-east-1"
)
type settings struct {
endpoint string
useTLS bool
region string
bucket string
key string
accessKeyID string
secretAccessKey string
}
func defaultSettings() *settings {
return &settings{
endpoint: defaultEndpoint,
useTLS: defaultUseTLS,
region: defaultRegion,
accessKeyID: os.Getenv(accessKeyIDEnvVar),
secretAccessKey: os.Getenv(secretAccessKeyEnvVar),
}
}

View File

@@ -0,0 +1,266 @@
package main
import (
"context"
"fmt"
"io"
"sync"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/tinyzimmer/go-glib/glib"
"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/base"
)
var sinkCAT = gst.NewDebugCategory(
"miniosink",
gst.DebugColorNone,
"MinIOSink Element",
)
var sinkProperties = []*gst.ParamSpec{
gst.NewStringParam(
"endpoint",
"S3 API Endpoint",
"The endpoint for the S3 API server",
&defaultEndpoint,
gst.ParameterReadWrite,
),
gst.NewBoolParam(
"use-tls",
"Use TLS",
"Use HTTPS for API requests",
defaultUseTLS,
gst.ParameterReadWrite,
),
gst.NewStringParam(
"region",
"Bucket region",
"The region where the bucket is",
&defaultRegion,
gst.ParameterReadWrite,
),
gst.NewStringParam(
"bucket",
"Bucket name",
"The name of the MinIO bucket",
nil,
gst.ParameterReadWrite,
),
gst.NewStringParam(
"key",
"Object key",
"The key of the object inside the bucket",
nil,
gst.ParameterReadWrite,
),
gst.NewStringParam(
"access-key-id",
"Access Key ID",
"The access key ID to use for authentication",
nil,
gst.ParameterReadWrite,
),
gst.NewStringParam(
"secret-access-key",
"Secret Access Key",
"The secret access key to use for authentication",
nil,
gst.ParameterReadWrite,
),
}
type minioSink struct {
settings *settings
state *sinkstate
}
type sinkstate struct {
started bool
rPipe io.ReadCloser
wPipe io.WriteCloser
doneChan chan struct{}
mux sync.Mutex
}
func (m *minioSink) New() gst.GoElement {
srcCAT.Log(gst.LevelLog, "Creating new minioSink object")
return &minioSink{
settings: defaultSettings(),
state: &sinkstate{},
}
}
func (m *minioSink) TypeInit(*gst.TypeInstance) {}
func (m *minioSink) ClassInit(klass *gst.ElementClass) {
sinkCAT.Log(gst.LevelLog, "Initializing miniosink class")
klass.SetMetadata(
"MinIO Sink",
"Sink/File",
"Write stream to a MinIO object",
"Avi Zimmerman <avi.zimmerman@gmail.com>",
)
sinkCAT.Log(gst.LevelLog, "Adding sink pad template and properties to class")
klass.AddPadTemplate(gst.NewPadTemplate(
"sink",
gst.PadDirectionSink,
gst.PadPresenceAlways,
gst.NewAnyCaps(),
))
klass.InstallProperties(sinkProperties)
}
func (m *minioSink) SetProperty(self *gst.Object, id uint, value *glib.Value) {
prop := sinkProperties[id]
val, err := value.GoValue()
if err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Could not coerce %v to go value", value), err.Error())
}
switch prop.Name() {
case "endpoint":
m.settings.endpoint = val.(string)
case "use-tls":
m.settings.useTLS = val.(bool)
case "region":
m.settings.region = val.(string)
case "bucket":
m.settings.bucket = val.(string)
case "key":
m.settings.key = val.(string)
case "access-key-id":
m.settings.accessKeyID = val.(string)
case "secret-access-key":
m.settings.secretAccessKey = val.(string)
}
}
func (m *minioSink) GetProperty(self *gst.Object, id uint) *glib.Value {
prop := sinkProperties[id]
var localVal interface{}
switch prop.Name() {
case "endpoint":
localVal = m.settings.endpoint
case "use-tls":
localVal = m.settings.useTLS
case "region":
localVal = m.settings.region
case "bucket":
localVal = m.settings.bucket
case "key":
localVal = m.settings.key
case "access-key-id":
localVal = m.settings.accessKeyID
case "secret-access-key":
localVal = m.settings.secretAccessKey
default:
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "")
return nil
}
val, err := glib.GValue(localVal)
if err != nil {
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed,
fmt.Sprintf("Could not convert %v to GValue", localVal),
err.Error(),
)
}
return val
}
func (m *minioSink) Start(self *base.GstBaseSink) bool {
if m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
"MinIOSink is already started", "")
return false
}
if m.settings.bucket == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
"No bucket configured on the miniosink", "")
return false
}
if m.settings.key == "" {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings,
"No bucket configured on the miniosink", "")
return false
}
m.state.doneChan = make(chan struct{})
client, err := minio.New(m.settings.endpoint, &minio.Options{
Creds: credentials.NewStaticV4(m.settings.accessKeyID, m.settings.secretAccessKey, ""),
Secure: m.settings.useTLS,
})
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error())
return false
}
m.state.rPipe, m.state.wPipe = io.Pipe()
go func() {
self.Log(sinkCAT, gst.LevelInfo,
fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key),
)
if _, err := client.PutObject(context.Background(),
m.settings.bucket, m.settings.key,
m.state.rPipe, -1,
minio.PutObjectOptions{
ContentType: "application/octet-stream",
}); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Error during PutObject call to %s/%s", m.settings.bucket, m.settings.key), err.Error())
}
m.state.doneChan <- struct{}{}
}()
m.state.started = true
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started")
return true
}
func (m *minioSink) Stop(self *base.GstBaseSink) bool {
if !m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "")
return false
}
if err := m.state.wPipe.Close(); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Failed to close the write pipe", err.Error())
return false
}
self.Log(sinkCAT, gst.LevelInfo, "Waiting for PutObject operation to complete")
<-m.state.doneChan
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped")
return true
}
func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn {
if !m.state.started {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "")
return gst.FlowError
}
self.Log(sinkCAT, gst.LevelLog, fmt.Sprintf("Rendering buffer %v", buffer))
if _, err := io.Copy(m.state.wPipe, buffer.Reader()); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Error copying buffer to write pipe", err.Error())
return gst.FlowError
}
return gst.FlowOK
}

View File

@@ -1,4 +1,4 @@
// This example demonstrates a src plugin that reads from objects in a minio bucket. // This example demonstrates a src element that reads from objects in a minio bucket.
// Since minio implements the S3 API this plugin could also be used for S3 buckets by // Since minio implements the S3 API this plugin could also be used for S3 buckets by
// setting the correct endpoints and credentials. // setting the correct endpoints and credentials.
// //
@@ -8,32 +8,14 @@
// //
// In order to build the plugin for use by GStreamer, you can do the following: // In order to build the plugin for use by GStreamer, you can do the following:
// //
// $ go generate
// $ go build -o libgstminiosrc.so -buildmode c-shared . // $ go build -o libgstminiosrc.so -buildmode c-shared .
// //
//
//go:generate gst-plugin-gen
//
// +plugin:Name=miniosrc
// +plugin:Description=GStreamer plugins for reading and writing from Minio
// +plugin:Version=v0.0.1
// +plugin:License=gst.LicenseLGPL
// +plugin:Source=go-gst
// +plugin:Package=examples
// +plugin:Origin=https://github.com/tinyzimmer/go-gst
// +plugin:ReleaseDate=2021-01-11
//
// +element:Name=miniosrc
// +element:Rank=gst.RankNone
// +element:Impl=minioSrc
// +element:Subclass=base.ExtendsBaseSrc
package main package main
import ( import (
"context" "context"
"fmt" "fmt"
"io" "io"
"os"
"sync" "sync"
minio "github.com/minio/minio-go/v7" minio "github.com/minio/minio-go/v7"
@@ -44,28 +26,13 @@ import (
"github.com/tinyzimmer/go-gst/gst/base" "github.com/tinyzimmer/go-gst/gst/base"
) )
func main() {} var srcCAT = gst.NewDebugCategory(
const (
accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID"
secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY"
)
var (
defaultEndpoint = "play.min.io"
defaultUseTLS = true
defaultRegion = "us-east-1"
defaultSeekable = true
)
// CAT is the plugin log category
var CAT = gst.NewDebugCategory(
"miniosrc", "miniosrc",
gst.DebugColorNone, gst.DebugColorNone,
"MinIOSrc Element", "MinIOSrc Element",
) )
var properties = []*gst.ParamSpec{ var srcProperties = []*gst.ParamSpec{
gst.NewStringParam( gst.NewStringParam(
"endpoint", "endpoint",
"S3 API Endpoint", "S3 API Endpoint",
@@ -90,14 +57,14 @@ var properties = []*gst.ParamSpec{
gst.NewStringParam( gst.NewStringParam(
"bucket", "bucket",
"Bucket name", "Bucket name",
"The name of the bucket to fetch the obejct from", "The name of the MinIO bucket",
nil, nil,
gst.ParameterReadWrite, gst.ParameterReadWrite,
), ),
gst.NewStringParam( gst.NewStringParam(
"key", "key",
"Object key", "Object key",
"The key of the object to write to the stream", "The key of the object inside the bucket",
nil, nil,
gst.ParameterReadWrite, gst.ParameterReadWrite,
), ),
@@ -115,38 +82,14 @@ var properties = []*gst.ParamSpec{
nil, nil,
gst.ParameterReadWrite, gst.ParameterReadWrite,
), ),
gst.NewBoolParam(
"seekable",
"Enable Seeking",
"Whether to report that seeking is enabled, disable over slow connections",
defaultSeekable,
gst.ParameterReadWrite,
),
} }
type settings struct { type minioSrc struct {
endpoint string settings *settings
useTLS bool state *srcstate
region string
bucket string
key string
accessKeyID string
secretAccessKey string
seekable bool
} }
func defaultSettings() *settings { type srcstate struct {
return &settings{
endpoint: defaultEndpoint,
useTLS: defaultUseTLS,
region: defaultRegion,
accessKeyID: os.Getenv(accessKeyIDEnvVar),
secretAccessKey: os.Getenv(secretAccessKeyEnvVar),
seekable: defaultSeekable,
}
}
type state struct {
started bool started bool
object *minio.Object object *minio.Object
objInfo minio.ObjectInfo objInfo minio.ObjectInfo
@@ -154,41 +97,36 @@ type state struct {
mux sync.Mutex mux sync.Mutex
} }
type minioSrc struct {
settings *settings
state *state
}
func (m *minioSrc) New() gst.GoElement { func (m *minioSrc) New() gst.GoElement {
CAT.Log(gst.LevelLog, "Creating new minioSrc object") srcCAT.Log(gst.LevelLog, "Creating new minioSrc object")
return &minioSrc{ return &minioSrc{
settings: defaultSettings(), settings: defaultSettings(),
state: &state{}, state: &srcstate{},
} }
} }
func (m *minioSrc) TypeInit(*gst.TypeInstance) {} func (m *minioSrc) TypeInit(*gst.TypeInstance) {}
func (m *minioSrc) ClassInit(klass *gst.ElementClass) { func (m *minioSrc) ClassInit(klass *gst.ElementClass) {
CAT.Log(gst.LevelLog, "Initializing miniosrc class") srcCAT.Log(gst.LevelLog, "Initializing miniosrc class")
klass.SetMetadata( klass.SetMetadata(
"MinIO Source", "MinIO Source",
"Source/File", "Source/File",
"Read stream from a MinIO object", "Read stream from a MinIO object",
"Avi Zimmerman <avi.zimmerman@gmail.com>", "Avi Zimmerman <avi.zimmerman@gmail.com>",
) )
CAT.Log(gst.LevelLog, "Adding src pad template and properties to class") srcCAT.Log(gst.LevelLog, "Adding src pad template and properties to class")
klass.AddPadTemplate(gst.NewPadTemplate( klass.AddPadTemplate(gst.NewPadTemplate(
"src", "src",
gst.PadDirectionSource, gst.PadDirectionSource,
gst.PadPresenceAlways, gst.PadPresenceAlways,
gst.NewAnyCaps(), gst.NewAnyCaps(),
)) ))
klass.InstallProperties(properties) klass.InstallProperties(srcProperties)
} }
func (m *minioSrc) SetProperty(self *gst.Object, id uint, value *glib.Value) { func (m *minioSrc) SetProperty(self *gst.Object, id uint, value *glib.Value) {
prop := properties[id] prop := srcProperties[id]
val, err := value.GoValue() val, err := value.GoValue()
if err != nil { if err != nil {
@@ -211,14 +149,12 @@ func (m *minioSrc) SetProperty(self *gst.Object, id uint, value *glib.Value) {
m.settings.accessKeyID = val.(string) m.settings.accessKeyID = val.(string)
case "secret-access-key": case "secret-access-key":
m.settings.secretAccessKey = val.(string) m.settings.secretAccessKey = val.(string)
case "seekable":
m.settings.seekable = val.(bool)
} }
} }
func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value { func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value {
prop := properties[id] prop := srcProperties[id]
var localVal interface{} var localVal interface{}
@@ -237,8 +173,6 @@ func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value {
localVal = m.settings.accessKeyID localVal = m.settings.accessKeyID
case "secret-access-key": case "secret-access-key":
localVal = m.settings.secretAccessKey localVal = m.settings.secretAccessKey
case "seekable":
localVal = m.settings.seekable
default: default:
gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,
@@ -258,11 +192,11 @@ func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value {
} }
func (m *minioSrc) Constructed(self *gst.Object) { func (m *minioSrc) Constructed(self *gst.Object) {
self.Log(CAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes") self.Log(srcCAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes")
base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes) base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes)
} }
func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return m.settings.seekable } func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return true }
func (m *minioSrc) GetSize(self *base.GstBaseSrc) (bool, int64) { func (m *minioSrc) GetSize(self *base.GstBaseSrc) (bool, int64) {
if !m.state.started { if !m.state.started {
@@ -300,7 +234,7 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool {
return false return false
} }
self.Log(CAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint)) self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint))
m.state.object, err = client.GetObject(context.Background(), m.settings.bucket, m.settings.key, minio.GetObjectOptions{}) m.state.object, err = client.GetObject(context.Background(), m.settings.bucket, m.settings.key, minio.GetObjectOptions{})
if err != nil { if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
@@ -308,14 +242,14 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool {
return false return false
} }
self.Log(CAT, gst.LevelInfo, "Getting HEAD for object") self.Log(srcCAT, gst.LevelInfo, "Getting HEAD for object")
m.state.objInfo, err = m.state.object.Stat() m.state.objInfo, err = m.state.object.Stat()
if err != nil { if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead,
fmt.Sprintf("Failed to stat object %q in bucket %q: %s", m.settings.key, m.settings.bucket, err.Error()), "") fmt.Sprintf("Failed to stat object %q in bucket %q: %s", m.settings.key, m.settings.bucket, err.Error()), "")
return false return false
} }
self.Log(CAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo)) self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo))
m.state.started = true m.state.started = true
@@ -323,7 +257,7 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool {
self.StartComplete(gst.FlowOK) self.StartComplete(gst.FlowOK)
self.Log(CAT, gst.LevelInfo, "MinIOSrc has started") self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has started")
return true return true
} }
@@ -344,7 +278,7 @@ func (m *minioSrc) Stop(self *base.GstBaseSrc) bool {
m.state.object = nil m.state.object = nil
m.state.started = false m.state.started = false
self.Log(CAT, gst.LevelInfo, "MinIOSrc has stopped") self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has stopped")
return true return true
} }
@@ -355,7 +289,7 @@ func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer
return gst.FlowError return gst.FlowError
} }
self.Log(CAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size)) self.Log(srcCAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size))
m.state.mux.Lock() m.state.mux.Lock()
defer m.state.mux.Unlock() defer m.state.mux.Unlock()
@@ -369,7 +303,7 @@ func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer
} }
if read < int(size) { if read < int(size) {
self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read)) self.Log(srcCAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read))
trim := make([]byte, read) trim := make([]byte, read)
copy(trim, data) copy(trim, data)
data = trim data = trim

View File

@@ -0,0 +1,61 @@
package main
import "C"
import (
"unsafe"
"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/base"
)
// The metadata for this plugin
var pluginMeta = &gst.PluginMetadata{
MajorVersion: gst.VersionMajor,
MinorVersion: gst.VersionMinor,
Name: "miniosrc",
Description: "GStreamer plugins for reading and writing from Minio",
Version: "v0.0.1",
License: gst.LicenseLGPL,
Source: "go-gst",
Package: "examples",
Origin: "https://github.com/tinyzimmer/go-gst",
ReleaseDate: "2021-01-11",
// The init function is called to register elements provided by the plugin.
Init: func(plugin *gst.Plugin) bool {
if ok := gst.RegisterElement(
plugin,
// The name of the element
"miniosrc",
// The rank of the element
gst.RankNone,
// The GoElement implementation for the element
&minioSrc{},
// The base subclass this element extends
base.ExtendsBaseSrc,
); !ok {
return ok
}
if ok := gst.RegisterElement(
plugin,
// The name of the element
"miniosink",
// The rank of the element
gst.RankNone,
// The GoElement implementation for the element
&minioSink{},
// The base subclass this element extends
base.ExtendsBaseSink,
); !ok {
return ok
}
return true
},
}
func main() {}
//export gst_plugin_minio_get_desc
func gst_plugin_minio_get_desc() unsafe.Pointer { return pluginMeta.Export() }