working minio plugin

This commit is contained in:
Avi Zimmerman
2021-01-21 20:57:29 +02:00
parent 375f569063
commit 811328136f
8 changed files with 263 additions and 99 deletions

View File

@@ -1,14 +1,12 @@
package main
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
minio "github.com/minio/minio-go/v7"
"github.com/tinyzimmer/go-glib/glib"
"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/base"
@@ -24,12 +22,8 @@ type minioSink struct {
settings *settings
state *sinkstate
rPipe *io.PipeReader
wPipe *io.PipeWriter
doneChan chan struct{}
cancel func()
mux sync.Mutex
writer *seekWriter
mux sync.Mutex
}
type sinkstate struct {
@@ -119,6 +113,7 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool {
m.settings.secretAccessKey = os.Getenv(spl[len(spl)-1])
}
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Creating new MinIO client for %s", m.settings.endpoint))
client, err := getMinIOClient(m.settings)
if err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
@@ -127,31 +122,8 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool {
return false
}
m.doneChan = make(chan struct{})
m.rPipe, m.wPipe = io.Pipe()
var ctx context.Context
ctx, m.cancel = context.WithCancel(context.Background())
go func() {
self.Ref()
defer self.Unref()
defer m.cancel()
defer func() { m.doneChan <- struct{}{} }()
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key))
info, err := client.PutObject(ctx, m.settings.bucket, m.settings.key, m.rPipe, -1, minio.PutObjectOptions{
ContentType: "application/octet-stream",
PartSize: m.settings.partSize,
})
if err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed,
fmt.Sprintf("Failed to do put object request to %s/%s/%s : %s", m.settings.endpoint, m.settings.bucket, m.settings.key, err.Error()), "")
return
}
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("PutObject operation has returned: %+v", info))
}()
self.Log(sinkCAT, gst.LevelInfo, "Initializing new MinIO writer")
m.writer = newSeekWriter(client, int64(m.settings.partSize), m.settings.bucket, m.settings.key)
m.state.started = true
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started")
@@ -168,18 +140,7 @@ func (m *minioSink) Stop(self *base.GstBaseSink) bool {
return false
}
self.Log(sinkCAT, gst.LevelInfo, "Closing write pipe to PutObject operation")
if err := m.wPipe.Close(); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to finalize MinIO object: %s", err.Error()), "")
return false
}
self.Log(sinkCAT, gst.LevelInfo, "Blocking until PutObject operation has completed")
<-m.doneChan
self.Log(sinkCAT, gst.LevelInfo, "PutObject operation has completed")
m.rPipe, m.wPipe, m.doneChan, m.cancel = nil, nil, nil, nil
m.writer = nil
m.state.started = false
self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped")
@@ -197,7 +158,7 @@ func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowR
self.Log(sinkCAT, gst.LevelTrace, fmt.Sprintf("Rendering buffer %v", buffer))
if _, err := m.wPipe.Write(buffer.Bytes()); err != nil {
if _, err := m.writer.Write(buffer.Bytes()); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, fmt.Sprintf("Failed to write data to minio buffer: %s", err.Error()), "")
return gst.FlowError
@@ -205,3 +166,49 @@ func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowR
return gst.FlowOK
}
func (m *minioSink) Event(self *base.GstBaseSink, event *gst.Event) bool {
switch event.Type() {
case gst.EventTypeSegment:
segment := event.ParseSegment()
if segment.GetFormat() == gst.FormatBytes {
if uint64(m.writer.currentPosition) != segment.GetStart() {
self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Seeking to %d", segment.GetStart()))
if _, err := m.writer.Seek(int64(segment.GetStart()), io.SeekStart); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "")
}
} else {
self.Log(sinkCAT, gst.LevelDebug, "Ignored SEGMENT, no seek needed")
}
} else {
self.Log(sinkCAT, gst.LevelDebug, fmt.Sprintf("Ignored SEGMENT event of format %s", segment.GetFormat().String()))
}
case gst.EventTypeFlushStop:
self.Log(sinkCAT, gst.LevelInfo, "Flushing contents of writer and seeking back to start")
if m.writer.currentPosition != 0 {
if err := m.writer.flush(true); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, err.Error(), "")
return false
}
if _, err := m.writer.Seek(0, io.SeekStart); err != nil {
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "")
}
}
case gst.EventTypeEOS:
self.Log(sinkCAT, gst.LevelInfo, "Received EOS, closing MinIO writer")
if err := m.writer.Close(); err != nil {
self.Log(sinkCAT, gst.LevelError, err.Error())
self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to close MinIO writer: %s", err.Error()), "")
return false
}
}
return self.ParentEvent(event)
}