Files
ffmpeg-go/node.go

280 lines
6.2 KiB
Go

package ffmpeg_go
import (
"context"
"errors"
"fmt"
"strings"
"github.com/u2takey/go-utils/sets"
)
type Stream struct {
Node *Node
Label Label
Selector Selector
Type string
FfmpegPath string
Context context.Context
}
type RunHook struct {
f func()
done <-chan struct{}
closer interface {
Close() error
}
}
func NewStream(node *Node, streamType string, label Label, selector Selector) *Stream {
return &Stream{
Node: node,
Label: label,
Selector: selector,
Type: streamType,
FfmpegPath: "ffmpeg",
Context: context.Background(),
}
}
func (s *Stream) Hash() int {
return s.Node.Hash() + getHash(s.Label)
}
func (s *Stream) Equal(other Stream) bool {
return s.Hash() == other.Hash()
}
func (s *Stream) String() string {
return fmt.Sprintf("node: %s, label: %s, selector: %s", s.Node.String(), s.Label, s.Selector)
}
func (s *Stream) Get(index string) *Stream {
if s.Selector != "" {
panic(errors.New("stream already has a selector"))
}
return s.Node.Stream(s.Label, Selector(index))
}
func (s *Stream) Audio() *Stream {
return s.Get("a")
}
func (s *Stream) Video() *Stream {
return s.Get("v")
}
func getStreamMap(streamSpec []*Stream) map[int]*Stream {
m := map[int]*Stream{}
for i := range streamSpec {
m[i] = streamSpec[i]
}
return m
}
func getStreamMapNodes(streamMap map[int]*Stream) (ret []*Node) {
for k := range streamMap {
ret = append(ret, streamMap[k].Node)
}
return ret
}
func getStreamSpecNodes(streamSpec []*Stream) []*Node {
return getStreamMapNodes(getStreamMap(streamSpec))
}
type Node struct {
streamSpec []*Stream
name string
incomingStreamTypes sets.String
outgoingStreamType string
minInputs int
maxInputs int
args []string
kwargs KwArgs
nodeType string
}
func NewNode(streamSpec []*Stream,
name string,
incomingStreamTypes sets.String,
outgoingStreamType string,
minInputs int,
maxInputs int,
args []string,
kwargs KwArgs,
nodeType string) *Node {
n := &Node{
streamSpec: streamSpec,
name: name,
incomingStreamTypes: incomingStreamTypes,
outgoingStreamType: outgoingStreamType,
minInputs: minInputs,
maxInputs: maxInputs,
args: args,
kwargs: kwargs,
nodeType: nodeType,
}
n.__checkInputLen()
n.__checkInputTypes()
return n
}
func NewInputNode(name string, args []string, kwargs KwArgs) *Node {
return NewNode(nil,
name,
nil,
"FilterableStream",
0,
0,
args,
kwargs,
"InputNode")
}
func NewFilterNode(name string, streamSpec []*Stream, maxInput int, args []string, kwargs KwArgs) *Node {
return NewNode(streamSpec,
name,
sets.NewString("FilterableStream"),
"FilterableStream",
1,
maxInput,
args,
kwargs,
"FilterNode")
}
func NewOutputNode(name string, streamSpec []*Stream, args []string, kwargs KwArgs) *Node {
return NewNode(streamSpec,
name,
sets.NewString("FilterableStream"),
"OutputStream",
1,
-1,
args,
kwargs,
"OutputNode")
}
func NewMergeOutputsNode(name string, streamSpec []*Stream) *Node {
return NewNode(streamSpec,
name,
sets.NewString("OutputStream"),
"OutputStream",
1,
-1,
nil,
nil,
"MergeOutputsNode")
}
func NewGlobalNode(name string, streamSpec []*Stream, args []string, kwargs KwArgs) *Node {
return NewNode(streamSpec,
name,
sets.NewString("OutputStream"),
"OutputStream",
1,
1,
args,
kwargs,
"GlobalNode")
}
func (n *Node) __checkInputLen() {
streamMap := getStreamMap(n.streamSpec)
if n.minInputs >= 0 && len(streamMap) < n.minInputs {
panic(fmt.Sprintf("Expected at least %d input stream(s); got %d", n.minInputs, len(streamMap)))
}
if n.maxInputs >= 0 && len(streamMap) > n.maxInputs {
panic(fmt.Sprintf("Expected at most %d input stream(s); got %d", n.maxInputs, len(streamMap)))
}
}
func (n *Node) __checkInputTypes() {
streamMap := getStreamMap(n.streamSpec)
for _, s := range streamMap {
if !n.incomingStreamTypes.Has(s.Type) {
panic(fmt.Sprintf("Expected incoming stream(s) to be of one of the following types: %s; got %s", n.incomingStreamTypes, s.Type))
}
}
}
func (n *Node) __getIncomingEdgeMap() map[Label]NodeInfo {
incomingEdgeMap := map[Label]NodeInfo{}
streamMap := getStreamMap(n.streamSpec)
for i, s := range streamMap {
incomingEdgeMap[Label(fmt.Sprintf("%06v", i))] = NodeInfo{
Node: s.Node,
Label: s.Label,
Selector: s.Selector,
}
}
return incomingEdgeMap
}
func (n *Node) Hash() int {
b := 0
for downStreamLabel, upStreamInfo := range n.IncomingEdgeMap() {
b += getHash(fmt.Sprintf("%s%d%s%s", downStreamLabel, upStreamInfo.Node.Hash(), upStreamInfo.Label, upStreamInfo.Selector))
}
b += getHash(n.args)
b += getHash(n.kwargs)
return b
}
func (n *Node) String() string {
return fmt.Sprintf("%s (%s) <%s>", n.name, getString(n.args), getString(n.kwargs))
}
func (n *Node) Equal(other DagNode) bool {
return n.Hash() == other.Hash()
}
func (n *Node) ShortRepr() string {
return n.name
}
func (n *Node) IncomingEdgeMap() map[Label]NodeInfo {
return n.__getIncomingEdgeMap()
}
func (n *Node) GetInComingEdges() []DagEdge {
return GetInComingEdges(n, n.IncomingEdgeMap())
}
func (n *Node) Stream(label Label, selector Selector) *Stream {
return NewStream(n, n.outgoingStreamType, label, selector)
}
func (n *Node) Get(a string) *Stream {
l := strings.Split(a, ":")
if len(l) == 2 {
return n.Stream(Label(l[0]), Selector(l[1]))
}
return n.Stream(Label(a), "")
}
func (n *Node) GetFilter(outgoingEdges []DagEdge) string {
if n.nodeType != "FilterNode" {
panic("call GetFilter on non-FilterNode")
}
args, kwargs, ret := n.args, n.kwargs, ""
if n.name == "split" || n.name == "asplit" {
args = []string{fmt.Sprintf("%d", len(outgoingEdges))}
}
// args = Args(args).EscapeWith("\\'=:")
for _, k := range kwargs.EscapeWith("\\'=:").SortedKeys() {
v := getString(kwargs[k])
if v != "" {
args = append(args, fmt.Sprintf("%s=%s", k, v))
} else {
args = append(args, fmt.Sprintf("%s", k))
}
}
ret = escapeChars(n.name, "\\'=:")
if len(args) > 0 {
ret += fmt.Sprintf("=%s", strings.Join(args, ":"))
}
return escapeChars(ret, "\\'[],;")
}