diff --git a/.github/Dockerfile b/.github/Dockerfile index fc6ed0d..27e3bb9 100644 --- a/.github/Dockerfile +++ b/.github/Dockerfile @@ -14,27 +14,13 @@ # ENTRYPOINT ["/app"] # ARG GO_VERSION=1.15 -FROM golang:${GO_VERSION} +FROM ubuntu:20.10 -RUN apt-get update \ - && DEBIAN_FRONTEND=noninteractive apt-get install -y cmake \ - build-essential python3 python3-pip glib2.0 glib2.0-dev libssl-dev bison flex liborc-dev ffmpeg libegl1-mesa \ - libavcodec-dev libavfilter-dev libavformat-dev libavresample-dev libavutil-dev libpciaccess-dev libegl1-mesa-dev \ - libpng-tools libpng-dev libjpeg-dev libx264-dev nasm libgtest-dev libnice-dev libdrm-dev gobject-introspection \ - libgirepository1.0-dev gir1.2-glib-2.0 libcairo-dev gtk3.0-dev liblcms2-dev liblcms2-2 liborc-0.4-dev liborc-0.4 \ - && pip3 install meson ninja \ - && rm -rf /var/lib/apt/lists/* - -RUN mkdir /gstreamer \ - && cd /gstreamer \ - && git clone https://gitlab.freedesktop.org/gstreamer/gst-build.git \ - && cd gst-build \ - && meson setup build \ - && cd build \ - && meson compile \ - && meson install \ - && cd .. \ - && rm -rf build/ - -RUN mkdir -p /workspace -WORKDIR /workspace \ No newline at end of file +RUN mkdir -p /build \ + && apt-get update \ + && DEBIAN_FRONTEND=noninteractive apt-get install -y \ + golang git make curl \ + libgstreamer1.0 libgstreamer1.0-dev \ + libgstreamer-plugins-bad1.0-dev libgstreamer-plugins-base1.0-dev \ + gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml deleted file mode 100644 index 16aa558..0000000 --- a/.github/workflows/build.yml +++ /dev/null @@ -1,21 +0,0 @@ -name: Build - -on: - push: - branches: [ main ] - -jobs: - - setup: - name: Build - runs-on: ubuntu-20.04 - steps: - - - name: Check out code into the Go module directory - uses: actions/checkout@v2 - - - name: Login to container reigstry - run: echo ${{ secrets.GHCR_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin - - - name: Build and push the go-gst docker image - run: make docker-build docker-push diff --git a/Makefile b/Makefile index f6f0e59..6ea1bc8 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,12 @@ DOCKER_IMAGE ?= ghcr.io/tinyzimmer/go-gst:$(GO_VERSION) GOPATH ?= $(shell go env GOPATH) GOBIN ?= $(GOPATH)/bin GOLANGCI_VERSION ?= v1.33.0 -GOLANGCI_LINT ?= $(GOBIN)/golangci-lint +GOLANGCI_LINT ?= $(GOBIN)/golangci-lint PLUGIN_GEN ?= "$(shell go env GOPATH)/bin/gst-plugin-gen" $(GOLANGCI_LINT): - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOBIN) $(GOLANGCI_VERSION) + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOBIN) $(GOLANGCI_VERSION) lint: $(GOLANGCI_LINT) $(GOLANGCI_LINT) run -v @@ -20,16 +20,14 @@ docker-build: --build-arg GO_VERSION=$(GO_VERSION) \ -t $(DOCKER_IMAGE) -docker-push: docker-build - docker push $(DOCKER_IMAGE) - CMD ?= /bin/bash -docker-run: +docker-run: docker-build docker run --rm --privileged \ -v /lib/modules:/lib/modules:ro \ -v /sys:/sys:ro \ -v /usr/src:/usr/src:ro \ -v "$(PWD)":/workspace \ + -w /workspace \ -e HOME=/tmp \ $(DOCKER_IMAGE) $(CMD) diff --git a/README.md b/README.md index 85d04e1..27df787 100644 --- a/README.md +++ b/README.md @@ -49,8 +49,10 @@ func main() { os.Exit(1) } - // Initialize GStreamer - gst.Init(nil) + // Initialize GStreamer with the arguments passed to the program. Gstreamer + // and the bindings will automatically pop off any handled arguments leaving + // nothing but a pipeline string (unless other invalid args are present). + gst.Init(&os.Args) // Create a main loop. This is only required when utilizing signals via the bindings. // In this example, the AddWatch on the pipeline bus requires iterating on the main loop. diff --git a/examples/plugins/minio/go.mod b/examples/plugins/minio/go.mod index f1d325d..ec1db0e 100644 --- a/examples/plugins/minio/go.mod +++ b/examples/plugins/minio/go.mod @@ -5,5 +5,5 @@ go 1.15 require ( github.com/minio/minio-go/v7 v7.0.7 github.com/tinyzimmer/go-glib v0.0.18 - github.com/tinyzimmer/go-gst v0.2.8 + github.com/tinyzimmer/go-gst v0.2.11 ) diff --git a/examples/plugins/minio/go.sum b/examples/plugins/minio/go.sum index 22b8ba3..954f947 100644 --- a/examples/plugins/minio/go.sum +++ b/examples/plugins/minio/go.sum @@ -70,6 +70,8 @@ github.com/tinyzimmer/go-gst v0.2.4 h1:uDGTzObBmIhyukqjCE9Jw0/EmmNU47Ztd5lBrtXTm github.com/tinyzimmer/go-gst v0.2.4/go.mod h1:aPV2CtdfNrtASAzj+DzrAISJr1Czfy25ihLJIh7f/tk= github.com/tinyzimmer/go-gst v0.2.8 h1:l0O9IjxncP7TMeeDFfYeQjrmsDv4STE0j8gVU1N8J74= github.com/tinyzimmer/go-gst v0.2.8/go.mod h1:C1yElEfXm8k0ddR4NdT1cJS4vFHv2wyVrIBSJCB6Nto= +github.com/tinyzimmer/go-gst v0.2.11 h1:Nfaz7k0L2stRrSGhdGyZbfbbCUMU6/zC0UBi8Ftt8S0= +github.com/tinyzimmer/go-gst v0.2.11/go.mod h1:C1yElEfXm8k0ddR4NdT1cJS4vFHv2wyVrIBSJCB6Nto= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/examples/plugins/minio/miniosink.go b/examples/plugins/minio/miniosink.go index 5e82469..227bf11 100644 --- a/examples/plugins/minio/miniosink.go +++ b/examples/plugins/minio/miniosink.go @@ -1,14 +1,12 @@ package main import ( - "context" "fmt" "io" "os" "strings" "sync" - minio "github.com/minio/minio-go/v7" "github.com/tinyzimmer/go-glib/glib" "github.com/tinyzimmer/go-gst/gst" "github.com/tinyzimmer/go-gst/gst/base" @@ -24,12 +22,8 @@ type minioSink struct { settings *settings state *sinkstate - rPipe *io.PipeReader - wPipe *io.PipeWriter - doneChan chan struct{} - cancel func() - - mux sync.Mutex + writer *seekWriter + mux sync.Mutex } type sinkstate struct { @@ -119,6 +113,7 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool { m.settings.secretAccessKey = os.Getenv(spl[len(spl)-1]) } + self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Creating new MinIO client for %s", m.settings.endpoint)) client, err := getMinIOClient(m.settings) if err != nil { self.Log(sinkCAT, gst.LevelError, err.Error()) @@ -127,31 +122,8 @@ func (m *minioSink) Start(self *base.GstBaseSink) bool { return false } - m.doneChan = make(chan struct{}) - m.rPipe, m.wPipe = io.Pipe() - var ctx context.Context - ctx, m.cancel = context.WithCancel(context.Background()) - - go func() { - self.Ref() - defer self.Unref() - defer m.cancel() - defer func() { m.doneChan <- struct{}{} }() - - self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Starting PutObject operation to %s/%s/%s", m.settings.endpoint, m.settings.bucket, m.settings.key)) - info, err := client.PutObject(ctx, m.settings.bucket, m.settings.key, m.rPipe, -1, minio.PutObjectOptions{ - ContentType: "application/octet-stream", - PartSize: m.settings.partSize, - }) - - if err != nil { - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, - fmt.Sprintf("Failed to do put object request to %s/%s/%s : %s", m.settings.endpoint, m.settings.bucket, m.settings.key, err.Error()), "") - return - } - - self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("PutObject operation has returned: %+v", info)) - }() + self.Log(sinkCAT, gst.LevelInfo, "Initializing new MinIO writer") + m.writer = newSeekWriter(client, int64(m.settings.partSize), m.settings.bucket, m.settings.key) m.state.started = true self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has started") @@ -168,18 +140,7 @@ func (m *minioSink) Stop(self *base.GstBaseSink) bool { return false } - self.Log(sinkCAT, gst.LevelInfo, "Closing write pipe to PutObject operation") - if err := m.wPipe.Close(); err != nil { - self.Log(sinkCAT, gst.LevelError, err.Error()) - self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to finalize MinIO object: %s", err.Error()), "") - return false - } - - self.Log(sinkCAT, gst.LevelInfo, "Blocking until PutObject operation has completed") - <-m.doneChan - self.Log(sinkCAT, gst.LevelInfo, "PutObject operation has completed") - - m.rPipe, m.wPipe, m.doneChan, m.cancel = nil, nil, nil, nil + m.writer = nil m.state.started = false self.Log(sinkCAT, gst.LevelInfo, "MinIOSink has stopped") @@ -197,7 +158,7 @@ func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowR self.Log(sinkCAT, gst.LevelTrace, fmt.Sprintf("Rendering buffer %v", buffer)) - if _, err := m.wPipe.Write(buffer.Bytes()); err != nil { + if _, err := m.writer.Write(buffer.Bytes()); err != nil { self.Log(sinkCAT, gst.LevelError, err.Error()) self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, fmt.Sprintf("Failed to write data to minio buffer: %s", err.Error()), "") return gst.FlowError @@ -205,3 +166,49 @@ func (m *minioSink) Render(self *base.GstBaseSink, buffer *gst.Buffer) gst.FlowR return gst.FlowOK } + +func (m *minioSink) Event(self *base.GstBaseSink, event *gst.Event) bool { + + switch event.Type() { + + case gst.EventTypeSegment: + segment := event.ParseSegment() + + if segment.GetFormat() == gst.FormatBytes { + if uint64(m.writer.currentPosition) != segment.GetStart() { + self.Log(sinkCAT, gst.LevelInfo, fmt.Sprintf("Seeking to %d", segment.GetStart())) + if _, err := m.writer.Seek(int64(segment.GetStart()), io.SeekStart); err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "") + } + } else { + self.Log(sinkCAT, gst.LevelDebug, "Ignored SEGMENT, no seek needed") + } + } else { + self.Log(sinkCAT, gst.LevelDebug, fmt.Sprintf("Ignored SEGMENT event of format %s", segment.GetFormat().String())) + } + + case gst.EventTypeFlushStop: + self.Log(sinkCAT, gst.LevelInfo, "Flushing contents of writer and seeking back to start") + if m.writer.currentPosition != 0 { + if err := m.writer.flush(true); err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorWrite, err.Error(), "") + return false + } + if _, err := m.writer.Seek(0, io.SeekStart); err != nil { + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorFailed, err.Error(), "") + } + } + + case gst.EventTypeEOS: + self.Log(sinkCAT, gst.LevelInfo, "Received EOS, closing MinIO writer") + if err := m.writer.Close(); err != nil { + self.Log(sinkCAT, gst.LevelError, err.Error()) + self.ErrorMessage(gst.DomainResource, gst.ResourceErrorClose, fmt.Sprintf("Failed to close MinIO writer: %s", err.Error()), "") + return false + } + + } + + return self.ParentEvent(event) + +} diff --git a/examples/plugins/minio/seek_writer.go b/examples/plugins/minio/seek_writer.go new file mode 100644 index 0000000..0dd45be --- /dev/null +++ b/examples/plugins/minio/seek_writer.go @@ -0,0 +1,190 @@ +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "fmt" + "io/ioutil" + "path" + + minio "github.com/minio/minio-go/v7" +) + +type seekWriter struct { + // The current position in the buffer + currentPosition int64 + // The size of each part to upload + partSize int64 + // A map of in memory parts to their content + parts map[int64][]byte + // A map of uploaded parts to the checksum at time of upload + uploadedParts map[int64]string + // A local reference to the minio client + client *minio.Client + bucket, key string +} + +func newSeekWriter(client *minio.Client, partsize int64, bucket, key string) *seekWriter { + return &seekWriter{ + currentPosition: 0, + partSize: partsize, + parts: make(map[int64][]byte), + uploadedParts: make(map[int64]string), + client: client, + bucket: bucket, key: key, + } +} + +func (s *seekWriter) Write(p []byte) (int, error) { + wrote, err := s.buffer(0, p) + if err != nil { + return wrote, err + } + return wrote, s.flush(false) +} + +func (s *seekWriter) Seek(offset int64, whence int) (int64, error) { + // Only needs to support SeekStart + s.currentPosition = offset + return s.currentPosition, nil +} + +func (s *seekWriter) Close() error { + if err := s.flush(true); err != nil { + return err + } + if len(s.uploadedParts) == 0 { + return nil + } + opts := make([]minio.CopySrcOptions, len(s.uploadedParts)) + for i := 0; i < len(opts); i++ { + opts[i] = minio.CopySrcOptions{ + Bucket: s.bucket, + Object: s.keyForPart(int64(i)), + } + } + _, err := s.client.ComposeObject(context.Background(), minio.CopyDestOptions{ + Bucket: s.bucket, + Object: s.key, + }, opts...) + + if err != nil { + return err + } + for _, opt := range opts { + if err := s.client.RemoveObject(context.Background(), opt.Bucket, opt.Object, minio.RemoveObjectOptions{}); err != nil { + return err + } + } + + return nil +} + +func (s *seekWriter) buffer(from int, p []byte) (int, error) { + currentPart := s.currentPosition / s.partSize + writeat := s.currentPosition % s.partSize + lenToWrite := int64(len(p)) + + var buf []byte + var ok bool + if buf, ok = s.parts[currentPart]; !ok { + if _, ok := s.uploadedParts[currentPart]; !ok { + s.parts[currentPart] = make([]byte, writeat+lenToWrite) + buf = s.parts[currentPart] + } else { + var err error + buf, err = s.fetchRemotePart(currentPart) + if err != nil { + return from, err + } + } + } + + if lenToWrite+writeat > s.partSize { + newbuf := make([]byte, s.partSize) + copy(newbuf, buf) + s.parts[currentPart] = newbuf + buf = newbuf + } else if lenToWrite+writeat > int64(len(buf)) { + newbuf := make([]byte, lenToWrite+writeat) + copy(newbuf, buf) + s.parts[currentPart] = newbuf + buf = newbuf + } + + wrote := copy(buf[writeat:], p) + + s.currentPosition += int64(wrote) + + if int64(wrote) != lenToWrite { + return s.buffer(from+wrote, p[wrote:]) + } + + return from + wrote, nil +} + +func (s *seekWriter) flush(all bool) error { + for part, buf := range s.parts { + if all || int64(len(buf)) == s.partSize { + if err := s.uploadPart(part, buf); err != nil { + return err + } + continue + } + if !all { + continue + } + if err := s.uploadPart(part, buf); err != nil { + return err + } + } + return nil +} + +func (s *seekWriter) uploadPart(part int64, data []byte) error { + h := sha256.New() + if _, err := h.Write(data); err != nil { + return err + } + datasum := fmt.Sprintf("%x", h.Sum(nil)) + if sum, ok := s.uploadedParts[part]; ok && sum == datasum { + return nil + } + _, err := s.client.PutObject(context.Background(), + s.bucket, s.keyForPart(part), + bytes.NewReader(data), int64(len(data)), + minio.PutObjectOptions{ + ContentType: "application/octet-stream", + }, + ) + if err != nil { + return err + } + delete(s.parts, part) + s.uploadedParts[part] = datasum + return nil +} + +func (s *seekWriter) fetchRemotePart(part int64) ([]byte, error) { + object, err := s.client.GetObject(context.Background(), s.bucket, s.keyForPart(part), minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + body, err := ioutil.ReadAll(object) + if err != nil { + return nil, err + } + s.parts[part] = body + return body, nil +} + +func (s *seekWriter) keyForPart(part int64) string { + if path.Dir(s.key) == "" { + return fmt.Sprintf("%s_tmp/%d", s.key, part) + } + return path.Join( + path.Dir(s.key), + fmt.Sprintf("%s_tmp/%d", path.Base(s.key), part), + ) +}