add s3 stream support

This commit is contained in:
wanglei.w
2021-04-15 17:47:07 +08:00
parent bb39562fae
commit 180149a7c9
8 changed files with 120 additions and 8 deletions

View File

@@ -1,7 +1,16 @@
package ffmpeg_go
import (
"context"
"errors"
"io"
"log"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
// Input file URL (ffmpeg ``-i`` option)
@@ -86,7 +95,47 @@ func Output(streams []*Stream, fileName string, kwargs ...KwArgs) *Stream {
func (s *Stream) Output(fileName string, kwargs ...KwArgs) *Stream {
if s.Type != "FilterableStream" {
panic("cannot output on non-FilterableStream")
log.Panic("cannot output on non-FilterableStream")
}
if strings.HasPrefix(fileName, "s3://") {
return s.outputS3Stream(fileName, kwargs...)
}
return Output([]*Stream{s}, fileName, kwargs...)
}
func (s *Stream) outputS3Stream(fileName string, kwargs ...KwArgs) *Stream {
r, w := io.Pipe()
fileL := strings.SplitN(strings.TrimPrefix(fileName, "s3://"), "/", 2)
if len(fileL) != 2 {
log.Panic("s3 file format not valid")
}
args := MergeKwArgs(kwargs)
awsConfig := args.PopDefault("aws_config", &aws.Config{}).(*aws.Config)
bucket, key := fileL[0], fileL[1]
o := Output([]*Stream{s}, "pipe:", args).
WithOutput(w, os.Stdout)
done := make(chan struct{})
runHook := RunHook{
f: func() {
defer func() {
done <- struct{}{}
}()
sess, err := session.NewSession(awsConfig)
uploader := s3manager.NewUploader(sess)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: &bucket,
Key: &key,
Body: r,
})
//fmt.Println(ioutil.ReadAll(r))
if err != nil {
log.Println("upload fail", err)
}
},
done: done,
closer: w,
}
o.Context = context.WithValue(o.Context, "run_hook", &runHook)
return o
}