diff --git a/README.md b/README.md index d84ec65..05ff1b1 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,24 @@ see complete example at: [opencv](./examples/opencv.go) result: ![image](./examples/sample_data/face-detect.jpg) +## Set cpu limit/request for ffmpeg-go + +```go +e := ComplexFilterExample("./sample_data/in1.mp4", "./sample_data/overlay.png", "./sample_data/out2.mp4") +err := e.RunWithResource(0.1, 0.5) +if err != nil { + assert.Nil(t, err) +} +``` + +result from command top: we will see ffmpeg used 0.5 core as expected. + +```bash +> top +PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND +1386105 root 20 0 2114152 273780 31672 R 50.2 1.7 0:16.79 ffmpeg +``` + # View Graph function view generate [mermaid](https://mermaid-js.github.io/mermaid/#/) chart, which can be use in markdown or view [online](https://mermaid-js.github.io/mermaid-live-editor/) diff --git a/examples/limitcpu_test.go b/examples/limitcpu_test.go new file mode 100644 index 0000000..126e15a --- /dev/null +++ b/examples/limitcpu_test.go @@ -0,0 +1,33 @@ +//+build linux + +package examples + +import ( + "testing" + + "github.com/stretchr/testify/assert" + ffmpeg "github.com/u2takey/ffmpeg-go" +) + +func ComplexFilterExample(testInputFile, testOverlayFile, testOutputFile string) *ffmpeg.Stream { + split := ffmpeg.Input(testInputFile).VFlip().Split() + split0, split1 := split.Get("0"), split.Get("1") + overlayFile := ffmpeg.Input(testOverlayFile).Crop(10, 10, 158, 112) + return ffmpeg.Concat([]*ffmpeg.Stream{ + split0.Trim(ffmpeg.KwArgs{"start_frame": 10, "end_frame": 20}), + split1.Trim(ffmpeg.KwArgs{"start_frame": 30, "end_frame": 40})}). + Overlay(overlayFile.HFlip(), ""). + DrawBox(50, 50, 120, 120, "red", 5). + Output(testOutputFile). + OverWriteOutput() +} + +// PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND +// 1386105 root 20 0 2114152 273780 31672 R 50.2 1.7 0:16.79 ffmpeg +func TestLimitCpu(t *testing.T) { + e := ComplexFilterExample("./sample_data/in1.mp4", "./sample_data/overlay.png", "./sample_data/out2.mp4") + err := e.WithCpuCoreRequest(0.1).WithCpuCoreLimit(0.5).RunLinux() + if err != nil { + assert.Nil(t, err) + } +} diff --git a/examples/opencv.go b/examples/opencv.go index 646be2b..b74d9d9 100644 --- a/examples/opencv.go +++ b/examples/opencv.go @@ -81,6 +81,8 @@ func openCvProcess(xmlFile string, reader io.ReadCloser, w, h int) { // show the image in the window, and wait 1 millisecond window.IMShow(img2) + img.Close() + img2.Close() if window.WaitKey(10) >= 0 { break } diff --git a/run_linux.go b/run_linux.go new file mode 100644 index 0000000..57ba7a7 --- /dev/null +++ b/run_linux.go @@ -0,0 +1,138 @@ +package ffmpeg_go + +import ( + "context" + "errors" + "io/ioutil" + "os" + "path/filepath" + "strconv" + + "github.com/u2takey/go-utils/rand" +) + +const ( + cgroupConfigKey = "cgroupConfig" + cpuRoot = "/sys/fs/cgroup/cpu,cpuacct" + cpuSetRoot = "/sys/fs/cgroup/cpuset" + procsFile = "cgroup.procs" + cpuSharesFile = "cpu.shares" + cfsPeriodUsFile = "cpu.cfs_period_us" + cfsQuotaUsFile = "cpu.cfs_quota_us" + cpuSetCpusFile = "cpuset.cpus" + cpuSetMemsFile = "cpuset.mems" +) + +type cgroupConfig struct { + cpuRequest float32 + cpuLimit float32 + cpuset string + memset string +} + +func (s *Stream) setCGroupConfig(f func(config *cgroupConfig)) *Stream { + a := s.Context.Value(cgroupConfigKey) + if a == nil { + a = &cgroupConfig{} + } + f(a.(*cgroupConfig)) + s.Context = context.WithValue(s.Context, cgroupConfigKey, a) + return s +} + +func (s *Stream) WithCpuCoreRequest(n float32) *Stream { + return s.setCGroupConfig(func(config *cgroupConfig) { + config.cpuRequest = n + }) +} + +func (s *Stream) WithCpuCoreLimit(n float32) *Stream { + return s.setCGroupConfig(func(config *cgroupConfig) { + config.cpuLimit = n + }) +} + +func (s *Stream) WithCpuSet(n string) *Stream { + return s.setCGroupConfig(func(config *cgroupConfig) { + config.cpuset = n + }) +} + +func (s *Stream) WithMemSet(n string) *Stream { + return s.setCGroupConfig(func(config *cgroupConfig) { + config.memset = n + }) +} + +func writeCGroupFile(rootPath, file string, value string) error { + return ioutil.WriteFile(filepath.Join(rootPath, file), []byte(value), 0755) +} + +func (s *Stream) RunLinux() error { + a := s.Context.Value(cgroupConfigKey).(*cgroupConfig) + if a.cpuRequest > a.cpuLimit { + return errors.New("cpuCoreLimit should greater or equal to cpuCoreRequest") + } + name := "ffmpeg_go_" + rand.String(6) + rootCpuPath, rootCpuSetPath := filepath.Join(cpuRoot, name), filepath.Join(cpuSetRoot, name) + err := os.MkdirAll(rootCpuPath, 0777) + if err != nil { + return err + } + err = os.MkdirAll(rootCpuSetPath, 0777) + if err != nil { + return err + } + defer func() { _ = os.Remove(rootCpuPath); _ = os.Remove(rootCpuSetPath) }() + + share := int(1024 * a.cpuRequest) + period := 100000 + quota := int(a.cpuLimit * 100000) + + if share > 0 { + err = writeCGroupFile(rootCpuPath, cpuSharesFile, strconv.Itoa(share)) + if err != nil { + return err + } + } + err = writeCGroupFile(rootCpuPath, cfsPeriodUsFile, strconv.Itoa(period)) + if err != nil { + return err + } + if quota > 0 { + err = writeCGroupFile(rootCpuPath, cfsQuotaUsFile, strconv.Itoa(quota)) + if err != nil { + return err + } + } + if a.cpuset != "" && a.memset != "" { + err = writeCGroupFile(rootCpuSetPath, cpuSetCpusFile, a.cpuset) + if err != nil { + return err + } + err = writeCGroupFile(rootCpuSetPath, cpuSetMemsFile, a.memset) + if err != nil { + return err + } + } + + cmd := s.Compile() + err = cmd.Start() + if err != nil { + return err + } + if share > 0 || quota > 0 { + err = writeCGroupFile(rootCpuPath, procsFile, strconv.Itoa(cmd.Process.Pid)) + if err != nil { + return err + } + } + if a.cpuset != "" && a.memset != "" { + err = writeCGroupFile(rootCpuSetPath, procsFile, strconv.Itoa(cmd.Process.Pid)) + if err != nil { + return err + } + } + + return cmd.Wait() +}