Fixing execution model and making compatible with windows

This commit is contained in:
Rohit Garg
2018-06-26 19:29:50 +05:30
parent 53621db373
commit 25818c560b
7 changed files with 296 additions and 395 deletions

View File

@@ -1,29 +0,0 @@
package transcoder
import (
"fmt"
"log"
"os/exec"
)
type Worker struct {
Command string
Args string
Output chan string
}
func (cmd *Worker) Run() {
out, err := exec.Command(cmd.Command, cmd.Args).Output()
if err != nil {
log.Fatal(err)
}
cmd.Output <- string(out)
}
func Collect(c chan string) {
for {
msg := <-c
fmt.Printf("The command result is %s\n", msg)
}
}

View File

@@ -14,16 +14,17 @@ import (
"strings"
"regexp"
"strconv"
"io"
)
type Transcoder struct {
process *exec.Cmd
stdErrPipe io.ReadCloser
mediafile *models.Mediafile
configuration ffmpeg.Configuration
}
func (t *Transcoder) SetProccess(v *exec.Cmd) {
t.process = v
func (t *Transcoder) SetProcessStderrPipe(v io.ReadCloser) {
t.stdErrPipe = v
}
func (t *Transcoder) SetMediaFile(v *models.Mediafile) {
@@ -36,10 +37,6 @@ func (t *Transcoder) SetConfiguration(v ffmpeg.Configuration) {
/*** GETTERS ***/
func (t Transcoder) Process() *exec.Cmd {
return t.process
}
func (t Transcoder) MediaFile() *models.Mediafile {
return t.mediafile
}
@@ -52,17 +49,10 @@ func (t Transcoder) FFprobeExec() string {
return t.configuration.FfprobeBin
}
func (t Transcoder) GetCommand() string {
var rcommand string
rcommand = fmt.Sprintf("%s -y ", t.configuration.FfmpegBin)
func (t Transcoder) GetCommand() []string {
media := t.mediafile
rcommand += media.ToStrCommand()
rcommand := append([]string{"-y"}, media.ToStrCommand()...)
return rcommand
}
/*** FUNCTIONS ***/
@@ -84,25 +74,16 @@ func (t *Transcoder) Initialize(inputPath string, outputPath string) (error) {
return errors.New("error: transcoder.Initialize -> input file not found")
}
command := fmt.Sprintf("%s -i \"%s\" -print_format json -show_format -show_streams -show_error", configuration.FfprobeBin, inputPath)
command := []string{"-i", inputPath, "-print_format", "json", "-show_format", "-show_streams", "-show_error"}
cmd := exec.Command(configuration.ExecCmd, configuration.ExecArgs, command)
fmt.Println("FFprobe command: " + command)
cmd := exec.Command(configuration.FfprobeBin, command...)
var out bytes.Buffer
cmd.Stdout = &out
err = cmd.Start()
err = cmd.Run()
if err != nil {
return err
}
_, err = cmd.Process.Wait()
if err != nil {
return err
return fmt.Errorf("Failed FFPROBE (%s) with %s, message %s", command, err, out.String())
}
var Metadata models.Metadata
@@ -112,151 +93,154 @@ func (t *Transcoder) Initialize(inputPath string, outputPath string) (error) {
}
// Set new Mediafile
MediaFile := new(models.Mediafile)
MediaFile.SetMetadata(Metadata)
MediaFile.SetInputPath(inputPath)
MediaFile.SetOutputPath(outputPath)
// Set transcoder configuration
MediaFile := new(models.Mediafile)
MediaFile.SetMetadata(Metadata)
MediaFile.SetInputPath(inputPath)
MediaFile.SetOutputPath(outputPath)
// Set transcoder configuration
t.SetMediaFile(MediaFile)
t.SetConfiguration(configuration)
t.SetMediaFile(MediaFile)
t.SetConfiguration(configuration)
return nil
return nil
}
func (t *Transcoder) Run() (<-chan bool, error) {
done := make(chan bool)
var err error
func (t *Transcoder) Run() <-chan error {
done := make(chan error)
command := t.GetCommand()
fmt.Println("FFmpeg command: " + command)
proc := exec.Command(t.configuration.FfmpegBin, command...)
proc := exec.Command(t.configuration.ExecCmd, t.configuration.ExecArgs, command)
errStream, err := proc.StderrPipe()
if err != nil {
fmt.Println("Progress not available: "+ err.Error())
} else {
t.SetProcessStderrPipe(errStream)
}
t.SetProccess(proc)
out := &bytes.Buffer{}
proc.Stdout = out
go func() {
err := proc.Start()
err = proc.Start()
go func(err error, out *bytes.Buffer, errStream io.ReadCloser) {
defer func() {
if errStream != nil {
errStream.Close()
}
}()
if err != nil {
done <- fmt.Errorf("Failed Start FFMPEG (%s) with %s, message %s", command, err, out.String())
close(done)
return
}
proc.Wait()
done <- true
err = proc.Wait()
if err != nil {
err = fmt.Errorf("Failed Finish FFMPEG (%s) with %s message %s", command, err, out.String())
}
done <- err
close(done)
}()
return done, err
}(err, out, errStream)
return done
}
func (t Transcoder) Output() (<-chan models.Progress, error) {
func (t Transcoder) Output() <-chan models.Progress {
out := make(chan models.Progress)
var err error
go func() {
defer close(out)
go func() {
defer close(out)
if t.stdErrPipe == nil {
return
}
scanner := bufio.NewScanner(t.stdErrPipe)
filetype := utils.CheckFileType(t.MediaFile().Metadata().Streams)
stderr, stderror := t.Process().StderrPipe()
if err != nil {
err = stderror
return
}
scanner := bufio.NewScanner(stderr)
filetype := utils.CheckFileType(t.MediaFile().Metadata().Streams)
split := func(data []byte, atEOF bool) (advance int, token []byte, spliterror error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
Iframe := strings.Index(string(data), "frame=")
if filetype == "video" {
if Iframe > 0 {
return Iframe + 1, data[Iframe:], nil
}
} else {
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0:i], nil
}
}
if atEOF {
return len(data), data, nil
}
split := func(data []byte, atEOF bool) (advance int, token []byte, spliterror error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
scanner.Split(split)
buf := make([]byte, 2)
scanner.Buffer(buf, bufio.MaxScanTokenSize)
Iframe := strings.Index(string(data), "frame=")
var lastProgress float64
for scanner.Scan() {
Progress := new(models.Progress)
line := scanner.Text()
if strings.Contains(line, "time=") && strings.Contains(line, "bitrate=") {
var re= regexp.MustCompile(`=\s+`)
st := re.ReplaceAllString(line, `=`)
f := strings.Fields(st)
var framesProcessed string
var currentTime string
var currentBitrate string
for j := 0; j < len(f); j++ {
field := f[j]
fieldSplit := strings.Split(field, "=")
if len(fieldSplit) > 1 {
fieldname := strings.Split(field, "=")[0]
fieldvalue := strings.Split(field, "=")[1]
if fieldname == "frame" {
framesProcessed = fieldvalue
}
if fieldname == "time" {
currentTime = fieldvalue
}
if fieldname == "bitrate" {
currentBitrate = fieldvalue
}
}
}
timesec := utils.DurToSec(currentTime)
dursec, _ := strconv.ParseFloat(t.MediaFile().Metadata().Format.Duration, 64)
// Progress calculation
progress := (timesec * 100) / dursec
Progress.Progress = progress
Progress.CurrentBitrate = currentBitrate
Progress.FramesProcessed = framesProcessed
Progress.CurrentTime = currentTime
if progress != lastProgress {
lastProgress = progress
out <- *Progress
}
if filetype == "video" {
if Iframe > 0 {
return Iframe + 1, data[Iframe:], nil
}
} else {
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0:i], nil
}
}
if scerror := scanner.Err(); err != nil {
err = scerror
if atEOF {
return len(data), data, nil
}
}()
return out, err
return 0, nil, nil
}
scanner.Split(split)
buf := make([]byte, 2)
scanner.Buffer(buf, bufio.MaxScanTokenSize)
var lastProgress float64
for scanner.Scan() {
Progress := new(models.Progress)
line := scanner.Text()
// fmt.Println(line)
if strings.Contains(line, "time=") && strings.Contains(line, "bitrate=") {
var re= regexp.MustCompile(`=\s+`)
st := re.ReplaceAllString(line, `=`)
f := strings.Fields(st)
var framesProcessed string
var currentTime string
var currentBitrate string
for j := 0; j < len(f); j++ {
field := f[j]
fieldSplit := strings.Split(field, "=")
if len(fieldSplit) > 1 {
fieldname := strings.Split(field, "=")[0]
fieldvalue := strings.Split(field, "=")[1]
if fieldname == "frame" {
framesProcessed = fieldvalue
}
if fieldname == "time" {
currentTime = fieldvalue
}
if fieldname == "bitrate" {
currentBitrate = fieldvalue
}
}
}
timesec := utils.DurToSec(currentTime)
dursec, _ := strconv.ParseFloat(t.MediaFile().Metadata().Format.Duration, 64)
// Progress calculation
progress := (timesec * 100) / dursec
Progress.Progress = progress
Progress.CurrentBitrate = currentBitrate
Progress.FramesProcessed = framesProcessed
Progress.CurrentTime = currentTime
if progress != lastProgress {
lastProgress = progress
out <- *Progress
}
}
}
}()
return out
}