gst_init finally works and pops correctly

This commit is contained in:
Avi Zimmerman
2021-01-21 10:01:06 +02:00
parent a4741af5a5
commit 4fd15b6a7a
6 changed files with 50 additions and 28 deletions

View File

@@ -15,8 +15,8 @@ import (
) )
func runPipeline(mainLoop *glib.MainLoop) error { func runPipeline(mainLoop *glib.MainLoop) error {
gst.Init(nil) gst.Init(&os.Args)
fmt.Println(os.Args)
if len(os.Args) == 1 { if len(os.Args) == 1 {
return errors.New("Pipeline string cannot be empty") return errors.New("Pipeline string cannot be empty")
} }
@@ -34,6 +34,7 @@ func runPipeline(mainLoop *glib.MainLoop) error {
pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() { switch msg.Type() {
case gst.MessageEOS: // When end-of-stream is received stop the main loop case gst.MessageEOS: // When end-of-stream is received stop the main loop
pipeline.BlockSetState(gst.StateNull)
mainLoop.Quit() mainLoop.Quit()
case gst.MessageError: // Error messages are always fatal case gst.MessageError: // Error messages are always fatal
err := msg.ParseError() err := msg.ParseError()

View File

@@ -36,6 +36,7 @@ type settings struct {
secretAccessKey string secretAccessKey string
insecureSkipVerify bool insecureSkipVerify bool
caCertFile string caCertFile string
partSize uint64
} }
func (s *settings) safestring() string { func (s *settings) safestring() string {
@@ -58,6 +59,7 @@ func defaultSettings() *settings {
accessKeyID: os.Getenv(accessKeyIDEnvVar), accessKeyID: os.Getenv(accessKeyIDEnvVar),
secretAccessKey: os.Getenv(secretAccessKeyEnvVar), secretAccessKey: os.Getenv(secretAccessKeyEnvVar),
insecureSkipVerify: defaultInsecureSkipVerify, insecureSkipVerify: defaultInsecureSkipVerify,
partSize: defaultPartSize,
} }
} }
@@ -114,6 +116,8 @@ func setProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *sett
settings.accessKeyID = val.(string) settings.accessKeyID = val.(string)
case "secret-access-key": case "secret-access-key":
settings.secretAccessKey = val.(string) settings.secretAccessKey = val.(string)
case "part-size":
settings.partSize = val.(uint64)
} }
} }
@@ -141,6 +145,8 @@ func getProperty(elem *gst.Element, properties []*glib.ParamSpec, settings *sett
localVal = settings.accessKeyID localVal = settings.accessKeyID
case "secret-access-key": case "secret-access-key":
localVal = "<private>" localVal = "<private>"
case "part-size":
localVal = settings.partSize
default: default:
elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings, elem.ErrorMessage(gst.DomainLibrary, gst.LibraryErrorSettings,

View File

@@ -133,12 +133,15 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool {
ctx, m.cancel = context.WithCancel(context.Background()) ctx, m.cancel = context.WithCancel(context.Background())
go func() { go func() {
self.Ref()
defer self.Unref()
defer m.cancel() defer m.cancel()
defer func() { m.doneChan <- struct{}{} }() defer func() { m.doneChan <- struct{}{} }()
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key)) self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key))
info, err := client.PutObject(ctx, m.settings.bucket, m.settings.key, m.rPipe, -1, minio.PutObjectOptions{ info, err := client.PutObject(ctx, m.settings.bucket, m.settings.key, m.rPipe, -1, minio.PutObjectOptions{
ContentType: "application/octet-stream", ContentType: "application/octet-stream",
PartSize: m.settings.partSize,
}) })
if err != nil { if err != nil {
@@ -156,6 +159,7 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool {
} }
func (m *minioSink) Stop(self *base.GstBaseSink) bool { func (m *minioSink) Stop(self *base.GstBaseSink) bool {
self.Log(sinkCAT, gst.LevelInfo, "Stopping MinIOSink")
m.mux.Lock() m.mux.Lock()
defer m.mux.Unlock() defer m.mux.Unlock()
@@ -164,6 +168,13 @@ func (m *minioSink) Stop(self *base.GstBaseSink) bool {
return false return false
} }
self.Log(sinkCAT, gst.LevelInfo, "Closing write pipe to PutObject operation")
if err := m.wPipe.Close(); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to finalize MinIO object: %s", err.Error()), "")
return false
}
self.Log(sinkCAT, gst.LevelInfo, "Blocking until PutObject operation has completed") self.Log(sinkCAT, gst.LevelInfo, "Blocking until PutObject operation has completed")
<-m.doneChan <-m.doneChan
self.Log(sinkCAT, gst.LevelInfo, "PutObject operation has completed") self.Log(sinkCAT, gst.LevelInfo, "PutObject operation has completed")
@@ -194,18 +205,3 @@ func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowR
return gst.FlowOK return gst.FlowOK
} }
func (m *minioSink) Event(self *base.GstBaseSink, event *gst.Event) bool {
switch event.Type() {
case gst.EventTypeEOS:
m.mux.Lock()
defer m.mux.Unlock()
self.Log(sinkCAT, gst.LevelInfo, "Received EOS, closing write pipe to PutObject operation")
if err := m.wPipe.Close(); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to finalize MinIO object: %s", err.Error()), "")
return false
}
}
return self.ParentEvent(event)
}

View File

@@ -149,6 +149,7 @@ func (m *minioSrc) Start(self *base.GstBaseSrc) bool {
} }
func (m *minioSrc) Stop(self *base.GstBaseSrc) bool { func (m *minioSrc) Stop(self *base.GstBaseSrc) bool {
self.Log(srcCAT, gst.LevelInfo, "Stopping MinIOSrc")
m.state.mux.Lock() m.state.mux.Lock()
defer m.state.mux.Unlock() defer m.state.mux.Unlock()

View File

@@ -1,6 +1,8 @@
package main package main
import ( import (
"math"
"github.com/tinyzimmer/go-glib/glib" "github.com/tinyzimmer/go-glib/glib"
) )
@@ -8,6 +10,9 @@ import (
// This is because the GType system doesn't allow for GObjects to share pointers // This is because the GType system doesn't allow for GObjects to share pointers
// to the exact same GParamSpecs. // to the exact same GParamSpecs.
const defaultPartSize = 1024 * 1024 * 128
const minPartSize = 1024 * 1024 * 5
var sinkProperties = []*glib.ParamSpec{ var sinkProperties = []*glib.ParamSpec{
glib.NewStringParam( glib.NewStringParam(
"endpoint", "endpoint",
@@ -72,6 +77,13 @@ var sinkProperties = []*glib.ParamSpec{
nil, nil,
glib.ParameterReadWrite, glib.ParameterReadWrite,
), ),
glib.NewUint64Param(
"part-size",
"Part Size",
"Size for each part in the multi-part upload",
minPartSize, math.MaxInt64, defaultPartSize,
glib.ParameterReadWrite,
),
} }
var srcProperties = []*glib.ParamSpec{ var srcProperties = []*glib.ParamSpec{

View File

@@ -1,6 +1,12 @@
package gst package gst
// #include "gst.go.h" /*
#include "gst.go.h"
static void* allocArgv(int argc) {
return malloc(sizeof(char *) * argc);
}
*/
import "C" import "C"
import "unsafe" import "unsafe"
@@ -21,21 +27,21 @@ var CAT *DebugCategory
// plugins it is generally better to initialize your own DebugCategory. // plugins it is generally better to initialize your own DebugCategory.
func Init(args *[]string) { func Init(args *[]string) {
if args != nil { if args != nil {
argc := C.int(len(*args)) cargc := C.int(len(*args))
argv := make([]*C.char, argc) cargv := (*[0xfff]*C.char)(C.allocArgv(cargc))
defer C.free(unsafe.Pointer(cargv))
for i, arg := range *args { for i, arg := range *args {
argv[i] = C.CString(arg) cargv[i] = C.CString(arg)
defer C.free(unsafe.Pointer(cargv[i]))
} }
C.gst_init((*C.int)(unsafe.Pointer(&argc)), C.gst_init(&cargc, (***C.char)(unsafe.Pointer(&cargv)))
(***C.char)(unsafe.Pointer(&argv))) unhandled := make([]string, cargc)
unhandled := make([]string, argc) for i := 0; i < int(cargc); i++ {
for i := 0; i < int(argc); i++ { unhandled[i] = C.GoString(cargv[i])
unhandled[i] = C.GoString(argv[i])
C.free(unsafe.Pointer(argv[i]))
} }
*args = unhandled *args = unhandled
} else { } else {
C.gst_init(nil, nil) C.gst_init(nil, nil)
} }
CAT = NewDebugCategory("GST_GO", DebugColorNone, "GStreamer Go Bindings") CAT = NewDebugCategory("GST_GO", DebugColorFgCyan, "GStreamer Go Bindings")
} }