From 4fd15b6a7a1aa2a0c1bb8c7e9b608b87f4cd08c4 Mon Sep 17 00:00:00 2001 From: Avi Zimmerman Date: Thu, 21 Jan 2021 10:01:06 +0200 Subject: [PATCH] gst_init finally works and pops correctly --- examples/launch/main.go | 5 +++-- examples/plugins/minio/common.go | 6 ++++++ examples/plugins/minio/miniosink.go | 26 +++++++++++--------------- examples/plugins/minio/miniosrc.go | 1 + examples/plugins/minio/properties.go | 12 ++++++++++++ gst/gst_init.go | 28 +++++++++++++++++----------- 6 files changed, 50 insertions(+), 28 deletions(-) diff --git a/examples/launch/main.go b/examples/launch/main.go index 58eb6f1..ccb6371 100644 --- a/examples/launch/main.go +++ b/examples/launch/main.go @@ -15,8 +15,8 @@ import ( ) func runPipeline(mainLoop *glib.MainLoop) error { - gst.Init(nil) - + gst.Init(&os.Args) + fmt.Println(os.Args) if len(os.Args) == 1 { return errors.New("Pipeline string cannot be empty") } @@ -34,6 +34,7 @@ func runPipeline(mainLoop *glib.MainLoop) error { pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { switch msg.Type() { case gst.MessageEOS: // When end-of-stream is received stop the main loop + pipeline.BlockSetState(gst.StateNull) mainLoop.Quit() case gst.MessageError: // Error messages are always fatal err := msg.ParseError() diff --git a/examples/plugins/minio/common.go b/examples/plugins/minio/common.go index b605194..d07300f 100644 --- a/examples/plugins/minio/common.go +++ b/examples/plugins/minio/common.go @@ -36,6 +36,7 @@ type settings struct { secretAccessKey string insecureSkipVerify bool caCertFile string + partSize uint64 } func (s *settings) safestring() string { @@ -58,6 +59,7 @@ func defaultSettings() *settings { accessKeyID: os.Getenv(accessKeyIDEnvVar), secretAccessKey: os.Getenv(secretAccessKeyEnvVar), insecureSkipVerify: defaultInsecureSkipVerify, + partSize: defaultPartSize, } } @@ -114,6 +116,8 @@ func setProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *sett settings.accessKeyID = val.(string) case "secret-access-key": settings.secretAccessKey = val.(string) + case "part-size": + settings.partSize = val.(uint64) } } @@ -141,6 +145,8 @@ func getProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *sett localVal = settings.accessKeyID case "secret-access-key": localVal = "" + case "part-size": + localVal = settings.partSize default: elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, diff --git a/examples/plugins/minio/miniosink.go b/examples/plugins/minio/miniosink.go index 575fe37..5e82469 100644 --- a/examples/plugins/minio/miniosink.go +++ b/examples/plugins/minio/miniosink.go @@ -133,12 +133,15 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool { ctx, m.cancel = context.WithCancel(context.Background()) go func() { + self.Ref() + defer self.Unref() defer m.cancel() defer func() { m.doneChan <- struct{}{} }() self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key)) info, err := client.PutObject(ctx, m.settings.bucket, m.settings.key, m.rPipe, -1, minio.PutObjectOptions{ ContentType: "application/octet-stream", + PartSize: m.settings.partSize, }) if err != nil { @@ -156,6 +159,7 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool { } func (m *minioSink) Stop(self *base.GstBaseSink) bool { + self.Log(sinkCAT, gst.LevelInfo, "Stopping MinIOSink") m.mux.Lock() defer m.mux.Unlock() @@ -164,6 +168,13 @@ func (m *minioSink) Stop(self *base.GstBaseSink) bool { return false } + self.Log(sinkCAT, gst.LevelInfo, "Closing write pipe to PutObject operation") + if err := m.wPipe.Close(); err != nil { + self.Log(sinkCAT, gst.LevelError, err.Error()) + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to finalize MinIO object: %s", err.Error()), "") + return false + } + self.Log(sinkCAT, gst.LevelInfo, "Blocking until PutObject operation has completed") <-m.doneChan self.Log(sinkCAT, gst.LevelInfo, "PutObject operation has completed") @@ -194,18 +205,3 @@ func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowR return gst.FlowOK } - -func (m *minioSink) Event(self *base.GstBaseSink, event *gst.Event) bool { - switch event.Type() { - case gst.EventTypeEOS: - m.mux.Lock() - defer m.mux.Unlock() - self.Log(sinkCAT, gst.LevelInfo, "Received EOS, closing write pipe to PutObject operation") - if err := m.wPipe.Close(); err != nil { - self.Log(sinkCAT, gst.LevelError, err.Error()) - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to finalize MinIO object: %s", err.Error()), "") - return false - } - } - return self.ParentEvent(event) -} diff --git a/examples/plugins/minio/miniosrc.go b/examples/plugins/minio/miniosrc.go index 640f900..5c177f6 100644 --- a/examples/plugins/minio/miniosrc.go +++ b/examples/plugins/minio/miniosrc.go @@ -149,6 +149,7 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool { } func (m *minioSrc) Stop(self *base.GstBaseSrc) bool { + self.Log(srcCAT, gst.LevelInfo, "Stopping MinIOSrc") m.state.mux.Lock() defer m.state.mux.Unlock() diff --git a/examples/plugins/minio/properties.go b/examples/plugins/minio/properties.go index 3b12377..c88cd09 100644 --- a/examples/plugins/minio/properties.go +++ b/examples/plugins/minio/properties.go @@ -1,6 +1,8 @@ package main import ( + "math" + "github.com/tinyzimmer/go-glib/glib" ) @@ -8,6 +10,9 @@ import ( // This is because the GType system doesn't allow for GObjects to share pointers // to the exact same GParamSpecs. +const defaultPartSize = 1024 * 1024 * 128 +const minPartSize = 1024 * 1024 * 5 + var sinkProperties = []*glib.ParamSpec{ glib.NewStringParam( "endpoint", @@ -72,6 +77,13 @@ var sinkProperties = []*glib.ParamSpec{ nil, glib.ParameterReadWrite, ), + glib.NewUint64Param( + "part-size", + "Part Size", + "Size for each part in the multi-part upload", + minPartSize, math.MaxInt64, defaultPartSize, + glib.ParameterReadWrite, + ), } var srcProperties = []*glib.ParamSpec{ diff --git a/gst/gst_init.go b/gst/gst_init.go index 9959981..c2ea8cd 100644 --- a/gst/gst_init.go +++ b/gst/gst_init.go @@ -1,6 +1,12 @@ package gst -// #include "gst.go.h" +/* +#include "gst.go.h" + +static void* allocArgv(int argc) { + return malloc(sizeof(char *) * argc); +} +*/ import "C" import "unsafe" @@ -21,21 +27,21 @@ var CAT *DebugCategory // plugins it is generally better to initialize your own DebugCategory. func Init(args *[]string) { if args != nil { - argc := C.int(len(*args)) - argv := make([]*C.char, argc) + cargc := C.int(len(*args)) + cargv := (*[0xfff]*C.char)(C.allocArgv(cargc)) + defer C.free(unsafe.Pointer(cargv)) for i, arg := range *args { - argv[i] = C.CString(arg) + cargv[i] = C.CString(arg) + defer C.free(unsafe.Pointer(cargv[i])) } - C.gst_init((*C.int)(unsafe.Pointer(&argc)), - (***C.char)(unsafe.Pointer(&argv))) - unhandled := make([]string, argc) - for i := 0; i < int(argc); i++ { - unhandled[i] = C.GoString(argv[i]) - C.free(unsafe.Pointer(argv[i])) + C.gst_init(&cargc, (***C.char)(unsafe.Pointer(&cargv))) + unhandled := make([]string, cargc) + for i := 0; i < int(cargc); i++ { + unhandled[i] = C.GoString(cargv[i]) } *args = unhandled } else { C.gst_init(nil, nil) } - CAT = NewDebugCategory("GST_GO", DebugColorNone, "GStreamer Go Bindings") + CAT = NewDebugCategory("GST_GO", DebugColorFgCyan, "GStreamer Go Bindings") }