diff --git a/examples/plugins/minio/common.go b/examples/plugins/minio/common.go new file mode 100644 index 0000000..3de8b9e --- /dev/null +++ b/examples/plugins/minio/common.go @@ -0,0 +1,36 @@ +package main + +import ( + "os" +) + +const ( + accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID" + secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY" +) + +var ( + defaultEndpoint = "play.min.io" + defaultUseTLS = true + defaultRegion = "us-east-1" +) + +type settings struct { + endpoint string + useTLS bool + region string + bucket string + key string + accessKeyID string + secretAccessKey string +} + +func defaultSettings() *settings { + return &settings{ + endpoint: defaultEndpoint, + useTLS: defaultUseTLS, + region: defaultRegion, + accessKeyID: os.Getenv(accessKeyIDEnvVar), + secretAccessKey: os.Getenv(secretAccessKeyEnvVar), + } +} diff --git a/examples/plugins/minio/miniosink.go b/examples/plugins/minio/miniosink.go new file mode 100644 index 0000000..d04f1fa --- /dev/null +++ b/examples/plugins/minio/miniosink.go @@ -0,0 +1,266 @@ +package main + +import ( + "context" + "fmt" + "io" + "sync" + + minio "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + + "github.com/tinyzimmer/go-glib/glib" + "github.com/tinyzimmer/go-gst/gst" + "github.com/tinyzimmer/go-gst/gst/base" +) + +var sinkCAT = gst.NewDebugCategory( + "miniosink", + gst.DebugColorNone, + "MinIOSink Element", +) + +var sinkProperties = []*gst.ParamSpec{ + gst.NewStringParam( + "endpoint", + "S3 API Endpoint", + "The endpoint for the S3 API server", + &defaultEndpoint, + gst.ParameterReadWrite, + ), + gst.NewBoolParam( + "use-tls", + "Use TLS", + "Use HTTPS for API requests", + defaultUseTLS, + gst.ParameterReadWrite, + ), + gst.NewStringParam( + "region", + "Bucket region", + "The region where the bucket is", + &defaultRegion, + gst.ParameterReadWrite, + ), + gst.NewStringParam( + "bucket", + "Bucket name", + "The name of the MinIO bucket", + nil, + gst.ParameterReadWrite, + ), + gst.NewStringParam( + "key", + "Object key", + "The key of the object inside the bucket", + nil, + gst.ParameterReadWrite, + ), + gst.NewStringParam( + "access-key-id", + "Access Key ID", + "The access key ID to use for authentication", + nil, + gst.ParameterReadWrite, + ), + gst.NewStringParam( + "secret-access-key", + "Secret Access Key", + "The secret access key to use for authentication", + nil, + gst.ParameterReadWrite, + ), +} + +type minioSink struct { + settings *settings + state *sinkstate +} + +type sinkstate struct { + started bool + rPipe io.ReadCloser + wPipe io.WriteCloser + doneChan chan struct{} + + mux sync.Mutex +} + +func (m *minioSink) New() gst.GoElement { + srcCAT.Log(gst.LevelLog, "Creating new minioSink object") + return &minioSink{ + settings: defaultSettings(), + state: &sinkstate{}, + } +} + +func (m *minioSink) TypeInit(*gst.TypeInstance) {} + +func (m *minioSink) ClassInit(klass *gst.ElementClass) { + sinkCAT.Log(gst.LevelLog, "Initializing miniosink class") + klass.SetMetadata( + "MinIO Sink", + "Sink/File", + "Write stream to a MinIO object", + "Avi Zimmerman ", + ) + sinkCAT.Log(gst.LevelLog, "Adding sink pad template and properties to class") + klass.AddPadTemplate(gst.NewPadTemplate( + "sink", + gst.PadDirectionSink, + gst.PadPresenceAlways, + gst.NewAnyCaps(), + )) + klass.InstallProperties(sinkProperties) +} + +func (m *minioSink) SetProperty(self *gst.Object, id uint, value *glib.Value) { + prop := sinkProperties[id] + + val, err := value.GoValue() + if err != nil { + gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, + fmt.Sprintf("Could not coerce %v to go value", value), err.Error()) + } + + switch prop.Name() { + case "endpoint": + m.settings.endpoint = val.(string) + case "use-tls": + m.settings.useTLS = val.(bool) + case "region": + m.settings.region = val.(string) + case "bucket": + m.settings.bucket = val.(string) + case "key": + m.settings.key = val.(string) + case "access-key-id": + m.settings.accessKeyID = val.(string) + case "secret-access-key": + m.settings.secretAccessKey = val.(string) + } + +} + +func (m *minioSink) GetProperty(self *gst.Object, id uint) *glib.Value { + prop := sinkProperties[id] + + var localVal interface{} + + switch prop.Name() { + case "endpoint": + localVal = m.settings.endpoint + case "use-tls": + localVal = m.settings.useTLS + case "region": + localVal = m.settings.region + case "bucket": + localVal = m.settings.bucket + case "key": + localVal = m.settings.key + case "access-key-id": + localVal = m.settings.accessKeyID + case "secret-access-key": + localVal = m.settings.secretAccessKey + + default: + gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, + fmt.Sprintf("Cannot get invalid property %s", prop.Name()), "") + return nil + } + + val, err := glib.GValue(localVal) + if err != nil { + gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorFailed, + fmt.Sprintf("Could not convert %v to GValue", localVal), + err.Error(), + ) + } + + return val +} + +func (m *minioSink) Start(self *base.GstBaseSink) bool { + if m.state.started { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, + "MinIOSink is already started", "") + return false + } + + if m.settings.bucket == "" { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, + "No bucket configured on the miniosink", "") + return false + } + + if m.settings.key == "" { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, + "No bucket configured on the miniosink", "") + return false + } + + m.state.doneChan = make(chan struct{}) + + client, err := minio.New(m.settings.endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(m.settings.accessKeyID, m.settings.secretAccessKey, ""), + Secure: m.settings.useTLS, + }) + + if err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, + fmt.Sprintf("Failed to connect to MinIO endpoint %s", m.settings.endpoint), err.Error()) + return false + } + + m.state.rPipe, m.state.wPipe = io.Pipe() + + go func() { + self.Log(sinkCAT, gst.LevelInfo, + fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key), + ) + if _, err := client.PutObject(context.Background(), + m.settings.bucket, m.settings.key, + m.state.rPipe, -1, + minio.PutObjectOptions{ + ContentType: "application/octet-stream", + }); err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, + fmt.Sprintf("Error during PutObject call to %s/%s", m.settings.bucket, m.settings.key), err.Error()) + } + m.state.doneChan <- struct{}{} + }() + + m.state.started = true + self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started") + return true +} + +func (m *minioSink) Stop(self *base.GstBaseSink) bool { + if !m.state.started { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "") + return false + } + if err := m.state.wPipe.Close(); err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Failed to close the write pipe", err.Error()) + return false + } + self.Log(sinkCAT, gst.LevelInfo, "Waiting for PutObject operation to complete") + <-m.state.doneChan + self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped") + return true +} + +func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowReturn { + if !m.state.started { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorSettings, "MinIOSink is not started", "") + return gst.FlowError + } + + self.Log(sinkCAT, gst.LevelLog, fmt.Sprintf("Rendering buffer %v", buffer)) + if _, err := io.Copy(m.state.wPipe, buffer.Reader()); err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, "Error copying buffer to write pipe", err.Error()) + return gst.FlowError + } + + return gst.FlowOK +} diff --git a/examples/plugins/miniosrc/miniosrc.go b/examples/plugins/minio/miniosrc.go similarity index 71% rename from examples/plugins/miniosrc/miniosrc.go rename to examples/plugins/minio/miniosrc.go index 8e2d05e..3facc33 100644 --- a/examples/plugins/miniosrc/miniosrc.go +++ b/examples/plugins/minio/miniosrc.go @@ -1,4 +1,4 @@ -// This example demonstrates a src plugin that reads from objects in a minio bucket. +// This example demonstrates a src element that reads from objects in a minio bucket. // Since minio implements the S3 API this plugin could also be used for S3 buckets by // setting the correct endpoints and credentials. // @@ -8,32 +8,14 @@ // // In order to build the plugin for use by GStreamer, you can do the following: // -// $ go generate // $ go build -o libgstminiosrc.so -buildmode c-shared . // -// -//go:generate gst-plugin-gen -// -// +plugin:Name=miniosrc -// +plugin:Description=GStreamer plugins for reading and writing from Minio -// +plugin:Version=v0.0.1 -// +plugin:License=gst.LicenseLGPL -// +plugin:Source=go-gst -// +plugin:Package=examples -// +plugin:Origin=https://github.com/tinyzimmer/go-gst -// +plugin:ReleaseDate=2021-01-11 -// -// +element:Name=miniosrc -// +element:Rank=gst.RankNone -// +element:Impl=minioSrc -// +element:Subclass=base.ExtendsBaseSrc package main import ( "context" "fmt" "io" - "os" "sync" minio "github.com/minio/minio-go/v7" @@ -44,28 +26,13 @@ import ( "github.com/tinyzimmer/go-gst/gst/base" ) -func main() {} - -const ( - accessKeyIDEnvVar = "MINIO_ACCESS_KEY_ID" - secretAccessKeyEnvVar = "MINIO_SECRET_ACCESS_KEY" -) - -var ( - defaultEndpoint = "play.min.io" - defaultUseTLS = true - defaultRegion = "us-east-1" - defaultSeekable = true -) - -// CAT is the plugin log category -var CAT = gst.NewDebugCategory( +var srcCAT = gst.NewDebugCategory( "miniosrc", gst.DebugColorNone, "MinIOSrc Element", ) -var properties = []*gst.ParamSpec{ +var srcProperties = []*gst.ParamSpec{ gst.NewStringParam( "endpoint", "S3 API Endpoint", @@ -90,14 +57,14 @@ var properties = []*gst.ParamSpec{ gst.NewStringParam( "bucket", "Bucket name", - "The name of the bucket to fetch the obejct from", + "The name of the MinIO bucket", nil, gst.ParameterReadWrite, ), gst.NewStringParam( "key", "Object key", - "The key of the object to write to the stream", + "The key of the object inside the bucket", nil, gst.ParameterReadWrite, ), @@ -115,38 +82,14 @@ var properties = []*gst.ParamSpec{ nil, gst.ParameterReadWrite, ), - gst.NewBoolParam( - "seekable", - "Enable Seeking", - "Whether to report that seeking is enabled, disable over slow connections", - defaultSeekable, - gst.ParameterReadWrite, - ), } -type settings struct { - endpoint string - useTLS bool - region string - bucket string - key string - accessKeyID string - secretAccessKey string - seekable bool +type minioSrc struct { + settings *settings + state *srcstate } -func defaultSettings() *settings { - return &settings{ - endpoint: defaultEndpoint, - useTLS: defaultUseTLS, - region: defaultRegion, - accessKeyID: os.Getenv(accessKeyIDEnvVar), - secretAccessKey: os.Getenv(secretAccessKeyEnvVar), - seekable: defaultSeekable, - } -} - -type state struct { +type srcstate struct { started bool object *minio.Object objInfo minio.ObjectInfo @@ -154,41 +97,36 @@ type state struct { mux sync.Mutex } -type minioSrc struct { - settings *settings - state *state -} - func (m *minioSrc) New() gst.GoElement { - CAT.Log(gst.LevelLog, "Creating new minioSrc object") + srcCAT.Log(gst.LevelLog, "Creating new minioSrc object") return &minioSrc{ settings: defaultSettings(), - state: &state{}, + state: &srcstate{}, } } func (m *minioSrc) TypeInit(*gst.TypeInstance) {} func (m *minioSrc) ClassInit(klass *gst.ElementClass) { - CAT.Log(gst.LevelLog, "Initializing miniosrc class") + srcCAT.Log(gst.LevelLog, "Initializing miniosrc class") klass.SetMetadata( "MinIO Source", "Source/File", "Read stream from a MinIO object", "Avi Zimmerman ", ) - CAT.Log(gst.LevelLog, "Adding src pad template and properties to class") + srcCAT.Log(gst.LevelLog, "Adding src pad template and properties to class") klass.AddPadTemplate(gst.NewPadTemplate( "src", gst.PadDirectionSource, gst.PadPresenceAlways, gst.NewAnyCaps(), )) - klass.InstallProperties(properties) + klass.InstallProperties(srcProperties) } func (m *minioSrc) SetProperty(self *gst.Object, id uint, value *glib.Value) { - prop := properties[id] + prop := srcProperties[id] val, err := value.GoValue() if err != nil { @@ -211,14 +149,12 @@ func (m *minioSrc) SetProperty(self *gst.Object, id uint, value *glib.Value) { m.settings.accessKeyID = val.(string) case "secret-access-key": m.settings.secretAccessKey = val.(string) - case "seekable": - m.settings.seekable = val.(bool) } } func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value { - prop := properties[id] + prop := srcProperties[id] var localVal interface{} @@ -237,8 +173,6 @@ func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value { localVal = m.settings.accessKeyID case "secret-access-key": localVal = m.settings.secretAccessKey - case "seekable": - localVal = m.settings.seekable default: gst.ToElement(self).ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, @@ -258,11 +192,11 @@ func (m *minioSrc) GetProperty(self *gst.Object, id uint) *glib.Value { } func (m *minioSrc) Constructed(self *gst.Object) { - self.Log(CAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes") + self.Log(srcCAT, gst.LevelLog, "Setting format of GstBaseSrc to bytes") base.ToGstBaseSrc(self).SetFormat(gst.FormatBytes) } -func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return m.settings.seekable } +func (m *minioSrc) IsSeekable(*base.GstBaseSrc) bool { return true } func (m *minioSrc) GetSize(self *base.GstBaseSrc) (bool, int64) { if !m.state.started { @@ -300,7 +234,7 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool { return false } - self.Log(CAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint)) + self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("Requesting %s/%s from %s", m.settings.bucket, m.settings.key, m.settings.endpoint)) m.state.object, err = client.GetObject(context.Background(), m.settings.bucket, m.settings.key, minio.GetObjectOptions{}) if err != nil { self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, @@ -308,14 +242,14 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool { return false } - self.Log(CAT, gst.LevelInfo, "Getting HEAD for object") + self.Log(srcCAT, gst.LevelInfo, "Getting HEAD for object") m.state.objInfo, err = m.state.object.Stat() if err != nil { self.ErrorMessage(gst.DomainResource, gst.ResourceErrorOpenRead, fmt.Sprintf("Failed to stat object %q in bucket %q: %s", m.settings.key, m.settings.bucket, err.Error()), "") return false } - self.Log(CAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo)) + self.Log(srcCAT, gst.LevelInfo, fmt.Sprintf("%+v", m.state.objInfo)) m.state.started = true @@ -323,7 +257,7 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool { self.StartComplete(gst.FlowOK) - self.Log(CAT, gst.LevelInfo, "MinIOSrc has started") + self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has started") return true } @@ -344,7 +278,7 @@ func (m *minioSrc) Stop(self *base.GstBaseSrc) bool { m.state.object = nil m.state.started = false - self.Log(CAT, gst.LevelInfo, "MinIOSrc has stopped") + self.Log(srcCAT, gst.LevelInfo, "MinIOSrc has stopped") return true } @@ -355,7 +289,7 @@ func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer return gst.FlowError } - self.Log(CAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size)) + self.Log(srcCAT, gst.LevelLog, fmt.Sprintf("Request to fill buffer from offset %v with size %v", offset, size)) m.state.mux.Lock() defer m.state.mux.Unlock() @@ -369,7 +303,7 @@ func (m *minioSrc) Fill(self *base.GstBaseSrc, offset uint64, size uint, buffer } if read < int(size) { - self.Log(CAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read)) + self.Log(srcCAT, gst.LevelDebug, fmt.Sprintf("Only read %d bytes from object, trimming", read)) trim := make([]byte, read) copy(trim, data) data = trim diff --git a/examples/plugins/minio/plugin.go b/examples/plugins/minio/plugin.go new file mode 100644 index 0000000..f14970d --- /dev/null +++ b/examples/plugins/minio/plugin.go @@ -0,0 +1,61 @@ +package main + +import "C" + +import ( + "unsafe" + + "github.com/tinyzimmer/go-gst/gst" + "github.com/tinyzimmer/go-gst/gst/base" +) + +// The metadata for this plugin +var pluginMeta = &gst.PluginMetadata{ + MajorVersion: gst.VersionMajor, + MinorVersion: gst.VersionMinor, + Name: "miniosrc", + Description: "GStreamer plugins for reading and writing from Minio", + Version: "v0.0.1", + License: gst.LicenseLGPL, + Source: "go-gst", + Package: "examples", + Origin: "https://github.com/tinyzimmer/go-gst", + ReleaseDate: "2021-01-11", + // The init function is called to register elements provided by the plugin. + Init: func(plugin *gst.Plugin) bool { + if ok := gst.RegisterElement( + plugin, + // The name of the element + "miniosrc", + // The rank of the element + gst.RankNone, + // The GoElement implementation for the element + &minioSrc{}, + // The base subclass this element extends + base.ExtendsBaseSrc, + ); !ok { + return ok + } + + if ok := gst.RegisterElement( + plugin, + // The name of the element + "miniosink", + // The rank of the element + gst.RankNone, + // The GoElement implementation for the element + &minioSink{}, + // The base subclass this element extends + base.ExtendsBaseSink, + ); !ok { + return ok + } + + return true + }, +} + +func main() {} + +//export gst_plugin_minio_get_desc +func gst_plugin_minio_get_desc() unsafe.Pointer { return pluginMeta.Export() }