Added some doc

This commit is contained in:
Alvaro Garcia
2021-07-14 13:09:41 +02:00
parent 94b5bc7c43
commit 0e9ecaa458
6 changed files with 340 additions and 1 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.idea
# Binaries for programs and plugins
*.exe
*.exe~

View File

@@ -1,2 +1,63 @@
# transcoder
Low level Go program transcode videos concurrently using FFMEG
I wanted to play with goroutines in Go so I created this program.
Basically, this program uses `Context`, `WaitGroup` and `goroutines` to handle different
FFMPEG processes in background and the same time. It also parses the output and provides
the status and errors through channels.
This is an example about how to use it:
```go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
)
//
func monitor(ctx context.Context, t *Transcoder) {
for {
select {
case <-ctx.Done():
log.Println("Stop monitor")
return
case line := <-t.Status:
fmt.Println("DATA", line)
case line := <-t.Error:
fmt.Println("ERROR", line)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
t := NewTranscoder(ctx)
go monitor(ctx, t)
args := []string{
"-y", "-i", "my_file.mov",
"-c:v", "h264",
"-c:a", "mp3",
"-hls_time",
"1000",
"-hls_wrap",
"100",
"output.mp4"}
uuid, err := t.Submit(args)
log.Println("NEW PROCESS", uuid, err)
args[11] = "output2.mp4"
uuid, err = t.Submit(args)
log.Println("OTHER PROCESS", uuid, err)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
s := <-c
log.Println("SIGNAL", s)
cancel()
t.Wait()
}
```

76
ffmpeg.go Normal file
View File

@@ -0,0 +1,76 @@
package main
import (
"context"
"github.com/google/uuid"
"log"
"regexp"
"strings"
"sync"
)
//A single ffmpeg process. It just listens for a stop event or new ffmpeg data.
func ffmpeg(ctx context.Context, wg *sync.WaitGroup, uuid uuid.UUID, status chan<-*StreamStatus, error chan <- error, args []string) {
defer wg.Done()
log.Println("Booting ffmpeg process")
ch := make(chan string)
defer close(ch)
streamStatus := &StreamStatus{}
streamStatus.Uuid = uuid
go run(ctx, ch, error,"ffmpeg", args)
for {
select {
case <-ctx.Done():
log.Println("Shutting down ffmepg process, waiting for result")
return
case line := <-ch:
//We are only interested on the transcoding status information
if strings.HasPrefix(line, "frame") {
parseLine(line, streamStatus)
status <- streamStatus
}
}
}
}
type StreamStatus struct {
Uuid uuid.UUID `json:"uuid"`
Frame string `json:"frame"`
Fps string `json:"fps"`
Quality string `json:"quality"`
Time string `json:"time"`
Bitrate string `json:"bitrate"`
Speed string `json:"speed"`
Error error `json:"error"`
}
func parseLine(line string, status *StreamStatus) {
// Stream
// frame= 267 fps= 45 q=-1.0 size=N/A time=00:00:10.65 bitrate=N/A speed= 1.8x
//r, err := regexp.Compile("frame=([0-9]+) fps=([0-9]+) q=([0-9]+\\.[0-9]+) size= ([0-9]+[A-Za-z]+) time=([0-9]+:[0-9]+:[0-9]+)\\.[0-9]+ bitrate=N/A speed=([0-9]+\\.[0-9]+)x")
//res := make([]string, 7)
m := regexp.MustCompile("[ ]{2,}")
line = m.ReplaceAllString(line, "")
line = strings.ReplaceAll(line, "= ", "=")
elems := strings.Split(line, " ")
for _, part := range elems {
p := strings.Split(part, "=")
switch p[0] {
case "frame":
status.Frame = p[1]
case "fps":
status.Fps = p[1]
case "q":
status.Quality = p[1]
case "time":
status.Time = p[1]
case "bitrate":
status.Bitrate = p[1]
case "speed":
status.Speed = p[1]
}
}
}

62
main.go Normal file
View File

@@ -0,0 +1,62 @@
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
)
//This function will run in background consuming and displaying the FFMPEG status information
func monitor(ctx context.Context, t *Transcoder) {
for {
select {
case <-ctx.Done():
log.Println("Stop monitor")
return
case line := <-t.Status:
fmt.Println("DATA", line)
case line := <-t.Error:
fmt.Println("ERROR", line)
}
}
}
func main() {
// The context to control when the transcoding process should stop
ctx, cancel := context.WithCancel(context.Background())
t := NewTranscoder(ctx)
go monitor(ctx, t)
// Some example parameters
args := []string{
"-y", "-i", "my_file.mov",
"-c:v", "h264",
"-c:a", "mp3",
"-hls_time",
"1000",
"-hls_wrap",
"100",
"output.mp4"}
// Send now transcoding job using the "Submit" method
uuid, err := t.Submit(args)
log.Println("NEW PROCESS", uuid, err)
// You can stop it at any moment
//err = t.Stop(uuid)
args[11] = "output2.mp4"
// Send another job
uuid, err = t.Submit(args)
log.Println("OTHER PROCESS", uuid, err)
// To stop the processing when a signal is received
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
s := <-c
log.Println("SIGNAL", s)
//Once we receive the signal to stop, we cancel all transcoding jobs and wait until all is stopped
cancel()
t.Wait()
}

62
process.go Normal file
View File

@@ -0,0 +1,62 @@
package main
import (
"bufio"
"bytes"
"context"
"fmt"
"log"
"os/exec"
)
func splitFunction(data []byte, atEOF bool) (advance int, token []byte, spliterror error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0:i], nil
}
if i := bytes.IndexByte(data, '\r'); i >= 0 {
// We have a cr terminated line
return i + 1, data[0:i], nil
}
if atEOF {
return len(data), data, nil
}
return 0, nil, nil
}
//Run a command and stream the otput
func run(ctx context.Context, status chan<- string, error chan <- error, command string, args []string) {
cmd := exec.CommandContext(ctx, command, args...)
out, err := cmd.StderrPipe()
if err != nil {
error<-err
return
}
err = cmd.Start()
if err != nil {
error<-err
return
}
scanner := bufio.NewScanner(out)
scanner.Split(splitFunction)
var lastLine string
for scanner.Scan() {
lastLine = scanner.Text()
select {
case <-ctx.Done():
log.Println("Stopping command", command)
error<-nil
return
case status <- lastLine:
}
}
error <- fmt.Errorf("program exited: %s", lastLine)
}

77
transcoder.go Normal file
View File

@@ -0,0 +1,77 @@
package main
/*
Here we have two kind of concurrent jobs: the ffmpeg and the transcoder manager.
The ffmpeg jobs just run the ffmpeg command and stream the status.
The transcoder is in charge of managing all these processes (killing them, reading from them, creating them...).
This is why we have two WaitGroup (because the ffmpeg processes should die in first place, then the transcoder).
*/
import (
"context"
"fmt"
"github.com/google/uuid"
"log"
"sync"
)
func NewTranscoder(ctx context.Context) *Transcoder {
t := Transcoder{}
t.Ctx = ctx
t.TranscoderWaitGroup = sync.WaitGroup{}
t.ProcessesWaitGroup = sync.WaitGroup{}
t.Status = make(chan *StreamStatus)
t.Error = make(chan error)
t.ProcessesCancels = make(map[uuid.UUID]context.CancelFunc)
t.TranscoderWaitGroup.Add(1)
go t.run()
return &t
}
type Transcoder struct {
Ctx context.Context
TranscoderWaitGroup sync.WaitGroup
ProcessesWaitGroup sync.WaitGroup
ProcessesCancels map[uuid.UUID]context.CancelFunc
Status chan *StreamStatus
Error chan error
}
func (t *Transcoder) Submit(args []string) (uuid.UUID, error) {
t.ProcessesWaitGroup.Add(1)
processUuid, err := uuid.NewUUID()
if err != nil {
return processUuid, err
}
ctx, cancel := context.WithCancel(context.Background())
t.ProcessesCancels[processUuid] = cancel
go ffmpeg(ctx, &t.ProcessesWaitGroup, processUuid, t.Status, t.Error, args)
return processUuid, nil
}
func (t *Transcoder) Stop(processUuid uuid.UUID) error {
if cancel, ok := t.ProcessesCancels[processUuid]; ok {
cancel()
delete(t.ProcessesCancels, processUuid)
return nil
} else {
return fmt.Errorf("process not found")
}
}
func (t *Transcoder) run() {
defer t.TranscoderWaitGroup.Done()
log.Println("Transcoder ready, waiting for shutdown signal")
<-t.Ctx.Done()
log.Println("Transcoder shutting down...")
for processUuid, cancel := range t.ProcessesCancels {
log.Println("Stopping encoding:", processUuid)
cancel()
}
log.Println("Waiting for all processes to stop...")
t.ProcessesWaitGroup.Wait()
log.Println("All processed stopped, bye")
}
func (t *Transcoder) Wait() {
t.ProcessesWaitGroup.Wait()
}