Files
ffmpeg-go/run.go
Alex Gustafsson 205828f3ee Add support for compilation options
Add options to the Compile function, and in turn, the Run function
in order to modify the compiled command instance before executing
it. This is helpful to, for example, specify that the command
should be run in a separate process group in order for ffmpeg not
to react on SIGINTs sent to the parent proces.
2022-03-22 11:32:09 +01:00

284 lines
7.6 KiB
Go

package ffmpeg_go
import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"sort"
"strings"
"syscall"
"time"
)
func getInputArgs(node *Node) []string {
var args []string
if node.name == "input" {
kwargs := node.kwargs.Copy()
filename := kwargs.PopString("filename")
format := kwargs.PopString("format")
videoSize := kwargs.PopString("video_size")
if format != "" {
args = append(args, "-f", format)
}
if videoSize != "" {
args = append(args, "-video_size", videoSize)
}
args = append(args, ConvertKwargsToCmdLineArgs(kwargs)...)
args = append(args, "-i", filename)
} else {
panic("unsupported node input name")
}
return args
}
func formatInputStreamName(streamNameMap map[string]string, edge DagEdge, finalArg bool) string {
prefix := streamNameMap[fmt.Sprintf("%d%s", edge.UpStreamNode.Hash(), edge.UpStreamLabel)]
suffix := ""
format := "[%s%s]"
if edge.UpStreamSelector != "" {
suffix = fmt.Sprintf(":%s", edge.UpStreamSelector)
}
if finalArg && edge.UpStreamNode.(*Node).nodeType == "InputNode" {
format = "%s%s"
}
return fmt.Sprintf(format, prefix, suffix)
}
func formatOutStreamName(streamNameMap map[string]string, edge DagEdge) string {
return fmt.Sprintf("[%s]", streamNameMap[fmt.Sprintf("%d%s", edge.UpStreamNode.Hash(), edge.UpStreamLabel)])
}
func _getFilterSpec(node *Node, outOutingEdgeMap map[Label][]NodeInfo, streamNameMap map[string]string) string {
var input, output []string
for _, e := range node.GetInComingEdges() {
input = append(input, formatInputStreamName(streamNameMap, e, false))
}
outEdges := GetOutGoingEdges(node, outOutingEdgeMap)
for _, e := range outEdges {
output = append(output, formatOutStreamName(streamNameMap, e))
}
return fmt.Sprintf("%s%s%s", strings.Join(input, ""), node.GetFilter(outEdges), strings.Join(output, ""))
}
func _getAllLabelsInSorted(m map[Label]NodeInfo) []Label {
var r []Label
for a := range m {
r = append(r, a)
}
sort.Slice(r, func(i, j int) bool {
return r[i] < r[j]
})
return r
}
func _getAllLabelsSorted(m map[Label][]NodeInfo) []Label {
var r []Label
for a := range m {
r = append(r, a)
}
sort.Slice(r, func(i, j int) bool {
return r[i] < r[j]
})
return r
}
func _allocateFilterStreamNames(nodes []*Node, outOutingEdgeMaps map[int]map[Label][]NodeInfo, streamNameMap map[string]string) {
sc := 0
for _, n := range nodes {
om := outOutingEdgeMaps[n.Hash()]
// todo sort
for _, l := range _getAllLabelsSorted(om) {
if len(om[l]) > 1 {
panic(fmt.Sprintf(`encountered %s with multiple outgoing edges
with same upstream label %s; a 'split'' filter is probably required`, n.name, l))
}
streamNameMap[fmt.Sprintf("%d%s", n.Hash(), l)] = fmt.Sprintf("s%d", sc)
sc += 1
}
}
}
func _getFilterArg(nodes []*Node, outOutingEdgeMaps map[int]map[Label][]NodeInfo, streamNameMap map[string]string) string {
_allocateFilterStreamNames(nodes, outOutingEdgeMaps, streamNameMap)
var filterSpec []string
for _, n := range nodes {
filterSpec = append(filterSpec, _getFilterSpec(n, outOutingEdgeMaps[n.Hash()], streamNameMap))
}
return strings.Join(filterSpec, ";")
}
func _getGlobalArgs(node *Node) []string {
return node.args
}
func _getOutputArgs(node *Node, streamNameMap map[string]string) []string {
if node.name != "output" {
panic("Unsupported output node")
}
var args []string
if len(node.GetInComingEdges()) == 0 {
panic("Output node has no mapped streams")
}
for _, e := range node.GetInComingEdges() {
streamName := formatInputStreamName(streamNameMap, e, true)
if streamName != "0" || len(node.GetInComingEdges()) > 1 {
args = append(args, "-map", streamName)
}
}
kwargs := node.kwargs.Copy()
filename := kwargs.PopString("filename")
if kwargs.HasKey("format") {
args = append(args, "-f", kwargs.PopString("format"))
}
if kwargs.HasKey("video_bitrate") {
args = append(args, "-b:v", kwargs.PopString("video_bitrate"))
}
if kwargs.HasKey("audio_bitrate") {
args = append(args, "-b:a", kwargs.PopString("audio_bitrate"))
}
if kwargs.HasKey("video_size") {
args = append(args, "-video_size", kwargs.PopString("video_size"))
}
args = append(args, ConvertKwargsToCmdLineArgs(kwargs)...)
args = append(args, filename)
return args
}
func (s *Stream) GetArgs() []string {
var args []string
nodes := getStreamSpecNodes([]*Stream{s})
var dagNodes []DagNode
streamNameMap := map[string]string{}
for i := range nodes {
dagNodes = append(dagNodes, nodes[i])
}
sorted, outGoingMap, err := TopSort(dagNodes)
if err != nil {
panic(err)
}
DebugNodes(sorted)
DebugOutGoingMap(sorted, outGoingMap)
var inputNodes, outputNodes, globalNodes, filterNodes []*Node
for i := range sorted {
n := sorted[i].(*Node)
switch n.nodeType {
case "InputNode":
streamNameMap[fmt.Sprintf("%d", n.Hash())] = fmt.Sprintf("%d", len(inputNodes))
inputNodes = append(inputNodes, n)
case "OutputNode":
outputNodes = append(outputNodes, n)
case "GlobalNode":
globalNodes = append(globalNodes, n)
case "FilterNode":
filterNodes = append(filterNodes, n)
}
}
// input args from inputNodes
for _, n := range inputNodes {
args = append(args, getInputArgs(n)...)
}
// filter args from filterNodes
filterArgs := _getFilterArg(filterNodes, outGoingMap, streamNameMap)
if filterArgs != "" {
args = append(args, "-filter_complex", filterArgs)
}
// output args from outputNodes
for _, n := range outputNodes {
args = append(args, _getOutputArgs(n, streamNameMap)...)
}
// global args with outputNodes
for _, n := range globalNodes {
args = append(args, _getGlobalArgs(n)...)
}
if s.Context.Value("OverWriteOutput") != nil {
args = append(args, "-y")
}
return args
}
func (s *Stream) WithTimeout(timeOut time.Duration) *Stream {
if timeOut > 0 {
s.Context, _ = context.WithTimeout(s.Context, timeOut)
}
return s
}
func (s *Stream) OverWriteOutput() *Stream {
s.Context = context.WithValue(s.Context, "OverWriteOutput", struct{}{})
return s
}
func (s *Stream) WithInput(reader io.Reader) *Stream {
s.Context = context.WithValue(s.Context, "Stdin", reader)
return s
}
func (s *Stream) WithOutput(out ...io.Writer) *Stream {
if len(out) > 0 {
s.Context = context.WithValue(s.Context, "Stdout", out[0])
}
if len(out) > 1 {
s.Context = context.WithValue(s.Context, "Stderr", out[1])
}
return s
}
func (s *Stream) WithErrorOutput(out io.Writer) *Stream {
s.Context = context.WithValue(s.Context, "Stderr", out)
return s
}
func (s *Stream) ErrorToStdOut() *Stream {
return s.WithErrorOutput(os.Stdout)
}
type CompilationOption func(s *Stream, cmd *exec.Cmd)
// SeparateProcessGroup ensures that the command is run in a separate process
// group. This is useful to enable handling of signals such as SIGINT without
// propagating them to the ffmpeg process.
func SeparateProcessGroup() CompilationOption {
return func(s *Stream, cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
}
}
// for test
func (s *Stream) Compile(options ...CompilationOption) *exec.Cmd {
args := s.GetArgs()
cmd := exec.CommandContext(s.Context, "ffmpeg", args...)
if a, ok := s.Context.Value("Stdin").(io.Reader); ok {
cmd.Stdin = a
}
if a, ok := s.Context.Value("Stdout").(io.Writer); ok {
cmd.Stdout = a
}
if a, ok := s.Context.Value("Stderr").(io.Writer); ok {
cmd.Stderr = a
}
for _, option := range options {
option(s, cmd)
}
log.Printf("compiled command: ffmpeg %s\n", strings.Join(args, " "))
return cmd
}
func (s *Stream) Run(options ...CompilationOption) error {
if s.Context.Value("run_hook") != nil {
hook := s.Context.Value("run_hook").(*RunHook)
go hook.f()
defer func() {
if hook.closer != nil {
_ = hook.closer.Close()
}
<-hook.done
}()
}
return s.Compile(options...).Run()
}