diff --git a/examples/example_test.go b/examples/example_test.go index 9e944a7..4304556 100644 --- a/examples/example_test.go +++ b/examples/example_test.go @@ -3,9 +3,17 @@ package examples import ( "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/disintegration/imaging" + "github.com/stretchr/testify/assert" + ffmpeg "github.com/u2takey/ffmpeg-go" ) +// +// More simple examples please refer to ffmpeg_test.go +// + func TestExampleStream(t *testing.T) { ExampleStream("./sample_data/in1.mp4", "./sample_data/out1.mp4", false) } @@ -26,7 +34,18 @@ func TestExampleShowProgress(t *testing.T) { ExampleShowProgress("./sample_data/in1.mp4", "./sample_data/out2.mp4") } -func TestExampleOpenCvFaceDetect(t *testing.T) { - ExampleFaceDetection("./sample_data/head-pose-face-detection-male-short.mp4", - "./sample_data/haarcascade_frontalface_default.xml") +func TestSimpleS3StreamExample(t *testing.T) { + err := ffmpeg.Input("./sample_data/in1.mp4", nil). + Output("s3://data-1251825869/test_out.ts", ffmpeg.KwArgs{ + "aws_config": &aws.Config{ + Credentials: credentials.NewStaticCredentials("xx", "yyy", ""), + //Endpoint: aws.String("xx"), + Region: aws.String("yyy"), + }, + // outputS3 use stream output, so you can only use supported format + // if you want mp4 format for example, you can output it to a file, and then call s3 sdk to do upload + "format": "mpegts", + }). + Run() + assert.Nil(t, err) } diff --git a/examples/opencv.go b/examples/opencv.go index b74d9d9..00ee6cf 100644 --- a/examples/opencv.go +++ b/examples/opencv.go @@ -1,3 +1,5 @@ +// +build gocv + package examples import ( @@ -11,6 +13,11 @@ import ( "gocv.io/x/gocv" ) +func TestExampleOpenCvFaceDetect(t *testing.T) { + ExampleFaceDetection("./sample_data/head-pose-face-detection-male-short.mp4", + "./sample_data/haarcascade_frontalface_default.xml") +} + func readProcess(infileName string, writer io.WriteCloser) <-chan error { log.Println("Starting ffmpeg process1") done := make(chan error) diff --git a/ffmpeg.go b/ffmpeg.go index 67a8517..ea8a4f1 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -1,7 +1,16 @@ package ffmpeg_go import ( + "context" "errors" + "io" + "log" + "os" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" ) // Input file URL (ffmpeg ``-i`` option) @@ -86,7 +95,47 @@ func Output(streams []*Stream, fileName string, kwargs ...KwArgs) *Stream { func (s *Stream) Output(fileName string, kwargs ...KwArgs) *Stream { if s.Type != "FilterableStream" { - panic("cannot output on non-FilterableStream") + log.Panic("cannot output on non-FilterableStream") + } + if strings.HasPrefix(fileName, "s3://") { + return s.outputS3Stream(fileName, kwargs...) } return Output([]*Stream{s}, fileName, kwargs...) } + +func (s *Stream) outputS3Stream(fileName string, kwargs ...KwArgs) *Stream { + r, w := io.Pipe() + fileL := strings.SplitN(strings.TrimPrefix(fileName, "s3://"), "/", 2) + if len(fileL) != 2 { + log.Panic("s3 file format not valid") + } + args := MergeKwArgs(kwargs) + awsConfig := args.PopDefault("aws_config", &aws.Config{}).(*aws.Config) + bucket, key := fileL[0], fileL[1] + o := Output([]*Stream{s}, "pipe:", args). + WithOutput(w, os.Stdout) + done := make(chan struct{}) + runHook := RunHook{ + f: func() { + defer func() { + done <- struct{}{} + }() + + sess, err := session.NewSession(awsConfig) + uploader := s3manager.NewUploader(sess) + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: &bucket, + Key: &key, + Body: r, + }) + //fmt.Println(ioutil.ReadAll(r)) + if err != nil { + log.Println("upload fail", err) + } + }, + done: done, + closer: w, + } + o.Context = context.WithValue(o.Context, "run_hook", &runHook) + return o +} diff --git a/go.mod b/go.mod index 8bc4a33..1e16544 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/u2takey/ffmpeg-go go 1.14 require ( + github.com/aws/aws-sdk-go v1.38.20 github.com/disintegration/imaging v1.6.2 github.com/stretchr/testify v1.4.0 github.com/tidwall/gjson v1.6.3 diff --git a/go.sum b/go.sum index 3dbd405..630c9c0 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aws/aws-sdk-go v1.38.20 h1:QbzNx/tdfATbdKfubBpkt84OM6oBkxQZRw6+bW2GyeA= +github.com/aws/aws-sdk-go v1.38.20/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -9,6 +11,9 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7 github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -17,6 +22,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/selinux v1.5.2/go.mod h1:yTcKuYAh6R95iDpefGLQaPaRwJFwyzAJufJyiTt7s0g= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= @@ -34,11 +40,19 @@ github.com/u2takey/go-utils v0.0.0-20200713025200-4704d09fc2c7 h1:PT7mE8HJE1mwaS github.com/u2takey/go-utils v0.0.0-20200713025200-4704d09fc2c7/go.mod h1:ATqKFpgjUIlhGRs8j59gXmu8Cmpo1QQEHV6vwu1hs28= gocv.io/x/gocv v0.25.0 h1:vM50jL3v9OEqWSi+urelX5M1ptZeFWA/VhGPvdTqsJU= gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 h1:hVwzHzIUGRjiF7EcUjqNxk3NCfkPxbDKRdnNE1Rpg0U= golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/node.go b/node.go index 613c599..c9ca2a9 100644 --- a/node.go +++ b/node.go @@ -17,6 +17,14 @@ type Stream struct { Context context.Context } +type RunHook struct { + f func() + done <-chan struct{} + closer interface { + Close() error + } +} + func NewStream(node *Node, streamType string, label Label, selector Selector) *Stream { return &Stream{ Node: node, diff --git a/run.go b/run.go index 3997bd1..010ce16 100644 --- a/run.go +++ b/run.go @@ -242,9 +242,15 @@ func (s *Stream) Compile() *exec.Cmd { } func (s *Stream) Run() error { - err := s.Compile().Run() - if err != nil { - return err + if s.Context.Value("run_hook") != nil { + hook := s.Context.Value("run_hook").(*RunHook) + go hook.f() + defer func() { + if hook.closer != nil { + _ = hook.closer.Close() + } + <-hook.done + }() } - return nil + return s.Compile().Run() } diff --git a/utils.go b/utils.go index f809d82..e251168 100644 --- a/utils.go +++ b/utils.go @@ -175,6 +175,14 @@ func (a KwArgs) GetDefault(k string, defaultV interface{}) interface{} { return defaultV } +func (a KwArgs) PopDefault(k string, defaultV interface{}) interface{} { + if v, ok := a[k]; ok { + defer delete(a, k) + return v + } + return defaultV +} + func ConvertKwargsToCmdLineArgs(kwargs KwArgs) []string { var keys, args []string for k := range kwargs {