From 4e0185cfe6e10c469bfdb5a4ccddce9b6f45d007 Mon Sep 17 00:00:00 2001 From: Alex X Date: Mon, 29 Apr 2024 18:34:48 +0300 Subject: [PATCH] Code refactoring after #878 --- README.md | 6 +++ internal/exec/exec.go | 28 +++++------ internal/exec/params.go | 34 -------------- internal/exec/params_linux.go | 88 ----------------------------------- internal/exec/pipe.go | 43 ++++++++++++----- internal/exec/pipe_linux.go | 48 ------------------- internal/streams/helpers.go | 3 ++ 7 files changed, 51 insertions(+), 199 deletions(-) delete mode 100644 internal/exec/params.go delete mode 100644 internal/exec/params_linux.go delete mode 100644 internal/exec/pipe_linux.go diff --git a/README.md b/README.md index aaed9410..f72ab384 100644 --- a/README.md +++ b/README.md @@ -412,11 +412,17 @@ The source can be used with: - [Raspberry Pi Cameras](https://www.raspberrypi.com/documentation/computers/camera_software.html) - any your own software +Pipe commands support two parameters (format: `exec:{command}#{param1}#{param2}`): + +- `killsignal` - signal which will be send to stop the process (numeric form) +- `killtimeout` - time in seconds for forced termination with sigkill + ```yaml streams: stream: exec:ffmpeg -re -i /media/BigBuckBunny.mp4 -c copy -rtsp_transport tcp -f rtsp {output} picam_h264: exec:libcamera-vid -t 0 --inline -o - picam_mjpeg: exec:libcamera-vid -t 0 --codec mjpeg -o - + canon: exec:gphoto2 --capture-movie --stdout#killsignal=2#killtimeout=5 ``` #### Source: Echo diff --git a/internal/exec/exec.go b/internal/exec/exec.go index cf3aa7e6..6202d94c 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -5,8 +5,10 @@ import ( "encoding/hex" "errors" "fmt" + "net/url" "os" "os/exec" + "strings" "sync" "time" @@ -20,12 +22,6 @@ import ( "github.com/rs/zerolog" ) -type Params struct { - KillSignal os.Signal - Command string - KillTimeout time.Duration -} - func Init() { rtsp.HandleFunc(func(conn *pkg.Conn) bool { waitersMu.Lock() @@ -50,22 +46,19 @@ func Init() { log = app.GetLogger("exec") } -func execHandle(url string) (core.Producer, error) { +func execHandle(rawURL string) (core.Producer, error) { var path string - params, err := parseParams(url) - if err != nil { - return nil, err - } + rawURL, rawQuery, _ := strings.Cut(rawURL, "#") - args := shell.QuoteSplit(params.Command[5:]) // remove `exec:` + args := shell.QuoteSplit(rawURL[5:]) // remove `exec:` for i, arg := range args { if arg == "{output}" { if rtsp.Port == "" { return nil, errors.New("rtsp module disabled") } - sum := md5.Sum([]byte(url)) + sum := md5.Sum([]byte(rawURL)) path = "/" + hex.EncodeToString(sum[:]) args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path break @@ -78,14 +71,15 @@ func execHandle(url string) (core.Producer, error) { } if path == "" { - return handlePipe(url, cmd, params) + query := streams.ParseQuery(rawQuery) + return handlePipe(rawURL, cmd, query) } - return handleRTSP(url, path, cmd) + return handleRTSP(rawURL, path, cmd) } -func handlePipe(_ string, cmd *exec.Cmd, params *Params) (core.Producer, error) { - r, err := PipeCloser(cmd, params) +func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) { + r, err := PipeCloser(cmd, query) if err != nil { return nil, err } diff --git a/internal/exec/params.go b/internal/exec/params.go deleted file mode 100644 index 3d736a4b..00000000 --- a/internal/exec/params.go +++ /dev/null @@ -1,34 +0,0 @@ -//go:build !linux - -package exec - -import ( - "fmt" - "net/url" - "runtime" - "strings" - - "github.com/AlexxIT/go2rtc/internal/streams" -) - -func parseParams(s string) (*Params, error) { - args := &Params{ - Command: s, - } - - var query url.Values - if i := strings.IndexByte(s, '#'); i > 0 { - query = streams.ParseQuery(s[i+1:]) - args.Command = s[:i] - } - - if _, ok := query["killsignal"]; ok { - return nil, fmt.Errorf("killsignal is not supported this %s", runtime.GOOS) - } - - if _, ok := query["killtimeout"]; ok { - return nil, fmt.Errorf("killtimeout is not supported in %s", runtime.GOOS) - } - - return args, nil -} diff --git a/internal/exec/params_linux.go b/internal/exec/params_linux.go deleted file mode 100644 index 6a7ce444..00000000 --- a/internal/exec/params_linux.go +++ /dev/null @@ -1,88 +0,0 @@ -package exec - -import ( - "fmt" - "net/url" - "os" - "strconv" - "strings" - "syscall" - "time" - - "github.com/AlexxIT/go2rtc/internal/streams" -) - -func parseParams(s string) (*Params, error) { - args := &Params{ - KillSignal: syscall.SIGKILL, - KillTimeout: 5 * time.Second, - Command: s, - } - - var query url.Values - if i := strings.IndexByte(s, '#'); i > 0 { - query = streams.ParseQuery(s[i+1:]) - args.Command = s[:i] - } - - if val, ok := query["killsignal"]; ok { - if sig, err := parseSignal(val[0]); err == nil { - args.KillSignal = sig - } else { - return nil, fmt.Errorf("could not parse killsignal param (%s)", val[0]) - } - } - - if val, ok := query["killtimeout"]; ok { - if i, err := strconv.Atoi(val[0]); err == nil { - args.KillTimeout = time.Duration(i) * time.Second - } else { - return nil, fmt.Errorf("could not convert killtimeout param (%s) to int", val[0]) - } - } - - return args, nil -} - -func parseSignal(signalString string) (os.Signal, error) { - signalMap := map[string]os.Signal{ - "sighup": syscall.SIGHUP, - "sigint": syscall.SIGINT, - "sigquit": syscall.SIGQUIT, - "sigill": syscall.SIGILL, - "sigtrap": syscall.SIGTRAP, - "sigabrt": syscall.SIGABRT, - "sigbus": syscall.SIGBUS, - "sigfpe": syscall.SIGFPE, - "sigkill": syscall.SIGKILL, - "sigusr1": syscall.SIGUSR1, - "sigsegv": syscall.SIGSEGV, - "sigusr2": syscall.SIGUSR2, - "sigpipe": syscall.SIGPIPE, - "sigalrm": syscall.SIGALRM, - "sigterm": syscall.SIGTERM, - "sigchld": syscall.SIGCHLD, - "sigcont": syscall.SIGCONT, - "sigstop": syscall.SIGSTOP, - "sigtstp": syscall.SIGTSTP, - "sigttin": syscall.SIGTTIN, - "sigttou": syscall.SIGTTOU, - "sigurg": syscall.SIGURG, - "sigxcpu": syscall.SIGXCPU, - "sigxfsz": syscall.SIGXFSZ, - "sigvtalrm": syscall.SIGVTALRM, - "sigprof": syscall.SIGPROF, - "sigwinch": syscall.SIGWINCH, - "sigio": syscall.SIGIO, - "sigpoll": syscall.SIGPOLL, - "sigpwr": syscall.SIGPWR, - "sigsys": syscall.SIGSYS, - } - - signalValue, ok := signalMap[strings.ToLower(signalString)] - if !ok { - return nil, fmt.Errorf("invalid signal: %s", signalString) - } - - return signalValue, nil -} diff --git a/internal/exec/pipe.go b/internal/exec/pipe.go index 9c40cccc..12ea136b 100644 --- a/internal/exec/pipe.go +++ b/internal/exec/pipe.go @@ -1,37 +1,56 @@ -//go:build !linux - package exec import ( "bufio" + "errors" "io" + "net/url" "os/exec" + "syscall" + "time" "github.com/AlexxIT/go2rtc/pkg/core" ) // PipeCloser - return StdoutPipe that Kill cmd on Close call -func PipeCloser(cmd *exec.Cmd, params *Params) (io.ReadCloser, error) { +func PipeCloser(cmd *exec.Cmd, query url.Values) (io.ReadCloser, error) { stdout, err := cmd.StdoutPipe() if err != nil { return nil, err } // add buffer for pipe reader to reduce syscall - return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, params}, nil + return &pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, query}, nil } type pipeCloser struct { io.Reader io.Closer - cmd *exec.Cmd - params *Params + cmd *exec.Cmd + query url.Values } -func (p pipeCloser) Close() error { - finished := make(chan bool) - - err := core.Any(p.Closer.Close(), p.cmd.Process.Kill(), p.cmd.Wait()) - finished <- true - return err +func (p *pipeCloser) Close() error { + return errors.Join(p.Closer.Close(), p.Kill(), p.Wait()) +} + +func (p *pipeCloser) Kill() error { + if s := p.query.Get("killsignal"); s != "" { + log.Trace().Msgf("[exec] kill with custom sig=%s", s) + sig := syscall.Signal(core.Atoi(s)) + return p.cmd.Process.Signal(sig) + } + return p.cmd.Process.Kill() +} + +func (p *pipeCloser) Wait() error { + if s := p.query.Get("killtimeout"); s != "" { + timeout := time.Duration(core.Atoi(s)) * time.Second + timer := time.AfterFunc(timeout, func() { + log.Trace().Msgf("[exec] kill after timeout=%s", s) + _ = p.cmd.Process.Kill() + }) + defer timer.Stop() // stop timer if Wait ends before timeout + } + return p.cmd.Wait() } diff --git a/internal/exec/pipe_linux.go b/internal/exec/pipe_linux.go deleted file mode 100644 index 9b28702e..00000000 --- a/internal/exec/pipe_linux.go +++ /dev/null @@ -1,48 +0,0 @@ -package exec - -import ( - "bufio" - "io" - "os/exec" - "syscall" - "time" - - "github.com/AlexxIT/go2rtc/pkg/core" -) - -// PipeCloser - return StdoutPipe that Kill cmd on Close call -func PipeCloser(cmd *exec.Cmd, params *Params) (io.ReadCloser, error) { - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - - // add buffer for pipe reader to reduce syscall - return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, params}, nil -} - -type pipeCloser struct { - io.Reader - io.Closer - cmd *exec.Cmd - params *Params -} - -func (p pipeCloser) Close() error { - finished := make(chan bool) - - if p.params.KillSignal != syscall.SIGKILL { - go func() { - select { - case <-time.After(p.params.KillTimeout): - p.cmd.Process.Kill() - break - case <-finished: - break - } - }() - } - err := core.Any(p.Closer.Close(), p.cmd.Process.Signal(p.params.KillSignal), p.cmd.Wait()) - finished <- true - return err -} diff --git a/internal/streams/helpers.go b/internal/streams/helpers.go index e59dab77..2ead1aa3 100644 --- a/internal/streams/helpers.go +++ b/internal/streams/helpers.go @@ -6,6 +6,9 @@ import ( ) func ParseQuery(s string) url.Values { + if len(s) == 0 { + return nil + } params := url.Values{} for _, key := range strings.Split(s, "#") { var value string