mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-11 13:20:06 +08:00
feat: add transcode
This commit is contained in:
@@ -1,6 +1,17 @@
|
||||
package transcode
|
||||
|
||||
import "m7s.live/m7s/v5"
|
||||
import (
|
||||
"fmt"
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg/config"
|
||||
"m7s.live/m7s/v5/pkg/task"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
flv "m7s.live/m7s/v5/plugin/flv/pkg"
|
||||
"net"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// / 定义传输模式的常量
|
||||
const (
|
||||
@@ -33,17 +44,82 @@ type (
|
||||
)
|
||||
|
||||
func NewTransform() m7s.ITransformer {
|
||||
return &Transformer{}
|
||||
ret := &Transformer{}
|
||||
ret.WriteFlvTag = func(flv net.Buffers) (err error) {
|
||||
select {
|
||||
case ret.rBuf <- flv:
|
||||
default:
|
||||
ret.Warn("pipe input buffer full")
|
||||
}
|
||||
return
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
type Transformer struct {
|
||||
m7s.DefaultTransformer
|
||||
TransRule
|
||||
rBuf chan net.Buffers
|
||||
*util.BufReader
|
||||
flv.Live
|
||||
}
|
||||
|
||||
func (t *Transformer) Start() (err error) {
|
||||
err = t.TransformJob.Subscribe()
|
||||
if err == nil {
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if t.TransformJob.Config.Input != nil {
|
||||
switch v := t.TransformJob.Config.Input.(type) {
|
||||
case map[string]any:
|
||||
config.Parse(&t.TransRule.From, v)
|
||||
case string:
|
||||
t.From.Args = v
|
||||
}
|
||||
}
|
||||
args := append([]string{"-f", "flv"}, strings.Fields(t.From.Args)...)
|
||||
if t.From.Codec != "" {
|
||||
args = append(args, "-c:v", t.From.Codec)
|
||||
}
|
||||
args = append(args, "-i", "pipe:0")
|
||||
t.To = make([]EncodeConfig, len(t.TransformJob.Config.Output))
|
||||
for i, to := range t.TransformJob.Config.Output {
|
||||
var enc EncodeConfig
|
||||
if to.Conf != nil {
|
||||
switch v := to.Conf.(type) {
|
||||
case map[string]any:
|
||||
config.Parse(&enc, v)
|
||||
case string:
|
||||
enc.Args = v
|
||||
}
|
||||
}
|
||||
t.To[i] = enc
|
||||
args = append(args, strings.Fields(enc.Args)...)
|
||||
if strings.HasPrefix(to.Target, "rtmp://") {
|
||||
args = append(args, "-f", "flv", to.Target)
|
||||
} else if strings.HasPrefix(to.Target, "rtsp://") {
|
||||
args = append(args, "-f", "rtsp", to.Target)
|
||||
} else {
|
||||
args = append(args, to.Target)
|
||||
}
|
||||
}
|
||||
t.Description = task.Description{
|
||||
"cmd": args,
|
||||
"config": t.TransRule,
|
||||
}
|
||||
t.rBuf = make(chan net.Buffers, 100)
|
||||
t.BufReader = util.NewBufReaderBuffersChan(t.rBuf)
|
||||
t.Subscriber = t.TransformJob.Subscriber
|
||||
|
||||
var cmdTask CommandTask
|
||||
cmdTask.logFileName = fmt.Sprintf("logs/transcode_%s_%s.log", strings.ReplaceAll(t.TransformJob.StreamPath, "/", "_"), time.Now().Format("20060102150405"))
|
||||
cmdTask.Cmd = exec.CommandContext(t, "ffmpeg", args...)
|
||||
cmdTask.Cmd.Stdin = t.BufReader
|
||||
t.AddTask(&cmdTask)
|
||||
return
|
||||
}
|
||||
|
||||
func (t *Transformer) Dispose() {
|
||||
close(t.rBuf)
|
||||
t.BufReader.Recycle()
|
||||
}
|
||||
|
Reference in New Issue
Block a user