From 6fb59949a24ddd782dbbdebcdc2381a0105fc96e Mon Sep 17 00:00:00 2001 From: Alex X Date: Sun, 23 Feb 2025 20:56:48 +0300 Subject: [PATCH] Rewrite exec handler --- internal/exec/closer.go | 39 ------------------------ internal/exec/exec.go | 60 +++++++++++++++++++++++-------------- pkg/shell/command.go | 59 ++++++++++++++++++++++++++++++++++++ pkg/shell/procattr.go | 7 +++++ pkg/shell/procattr_linux.go | 6 ++++ pkg/stdin/backchannel.go | 6 +--- pkg/stdin/client.go | 7 ++--- 7 files changed, 113 insertions(+), 71 deletions(-) delete mode 100644 internal/exec/closer.go create mode 100644 pkg/shell/command.go create mode 100644 pkg/shell/procattr.go create mode 100644 pkg/shell/procattr_linux.go diff --git a/internal/exec/closer.go b/internal/exec/closer.go deleted file mode 100644 index 66d0e3ac..00000000 --- a/internal/exec/closer.go +++ /dev/null @@ -1,39 +0,0 @@ -package exec - -import ( - "errors" - "net/url" - "os" - "os/exec" - "syscall" - "time" - - "github.com/AlexxIT/go2rtc/pkg/core" -) - -// closer support custom killsignal with custom killtimeout -type closer struct { - cmd *exec.Cmd - query url.Values -} - -func (c *closer) Close() (err error) { - sig := os.Kill - if s := c.query.Get("killsignal"); s != "" { - sig = syscall.Signal(core.Atoi(s)) - } - - log.Trace().Msgf("[exec] kill with signal=%d", sig) - err = c.cmd.Process.Signal(sig) - - if s := c.query.Get("killtimeout"); s != "" { - timeout := time.Duration(core.Atoi(s)) * time.Second - timer := time.AfterFunc(timeout, func() { - log.Trace().Msgf("[exec] kill after timeout=%s", s) - _ = c.cmd.Process.Kill() - }) - defer timer.Stop() // stop timer if Wait ends before timeout - } - - return errors.Join(err, c.cmd.Wait()) -} diff --git a/internal/exec/exec.go b/internal/exec/exec.go index bce166e8..89add393 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -9,9 +9,9 @@ import ( "io" "net/url" "os" - "os/exec" "strings" "sync" + "syscall" "time" "github.com/AlexxIT/go2rtc/internal/app" @@ -49,7 +49,7 @@ func Init() { log = app.GetLogger("exec") } -func execHandle(rawURL string) (core.Producer, error) { +func execHandle(rawURL string) (prod core.Producer, err error) { rawURL, rawQuery, _ := strings.Cut(rawURL, "#") query := streams.ParseQuery(rawQuery) @@ -67,39 +67,55 @@ func execHandle(rawURL string) (core.Producer, error) { rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:] } - args := shell.QuoteSplit(rawURL[5:]) // remove `exec:` - cmd := exec.Command(args[0], args[1:]...) + cmd := shell.NewCommand(rawURL[5:]) // remove `exec:` cmd.Stderr = &logWriter{ buf: make([]byte, 512), debug: log.Debug().Enabled(), } + if s := query.Get("killsignal"); s != "" { + sig := syscall.Signal(core.Atoi(s)) + cmd.Cancel = func() error { + log.Debug().Msgf("[exec] kill with signal=%d", sig) + return cmd.Process.Signal(sig) + } + } + + if s := query.Get("killtimeout"); s != "" { + cmd.WaitDelay = time.Duration(core.Atoi(s)) * time.Second + } + if query.Get("backchannel") == "1" { return stdin.NewClient(cmd) } - cl := &closer{cmd: cmd, query: query} - if path == "" { - return handlePipe(rawURL, cmd, cl) + prod, err = handlePipe(rawURL, cmd) + } else { + prod, err = handleRTSP(rawURL, cmd, path) } - return handleRTSP(rawURL, cmd, cl, path) + if err != nil { + _ = cmd.Close() + } + + return } -func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) { +func handlePipe(source string, cmd *shell.Command) (core.Producer, error) { stdout, err := cmd.StdoutPipe() if err != nil { return nil, err } - rc := struct { + rd := struct { io.Reader io.Closer }{ // add buffer for pipe reader to reduce syscall bufio.NewReaderSize(stdout, core.BufferSize), - cl, + // stop cmd on close pipe call + cmd, } log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe") @@ -110,9 +126,8 @@ func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, erro return nil, err } - prod, err := magic.Open(rc) + prod, err := magic.Open(rd) if err != nil { - _ = rc.Close() return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr) } @@ -126,7 +141,7 @@ func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, erro return prod, nil } -func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) { +func handleRTSP(source string, cmd *shell.Command, path string) (core.Producer, error) { if log.Trace().Enabled() { cmd.Stdout = os.Stdout } @@ -152,23 +167,22 @@ func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.P return nil, err } - done := make(chan error, 1) - go func() { - done <- cmd.Wait() - }() + timeout := time.NewTimer(30 * time.Second) + defer timeout.Stop() select { - case <-time.After(time.Minute): + case <-timeout.C: + // haven't received data from app in timeout log.Error().Str("source", source).Msg("[exec] timeout") - _ = cl.Close() return nil, errors.New("exec: timeout") - case <-done: - // limit message size + case <-cmd.Done(): + // app fail before we receive any data return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr) case prod := <-waiter: + // app started successfully log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp") setRemoteInfo(prod, source, cmd.Args) - prod.OnClose = cl.Close + prod.OnClose = cmd.Close return prod, nil } } diff --git a/pkg/shell/command.go b/pkg/shell/command.go new file mode 100644 index 00000000..b7c81899 --- /dev/null +++ b/pkg/shell/command.go @@ -0,0 +1,59 @@ +package shell + +import ( + "context" + "os/exec" +) + +// Command like exec.Cmd, but with support: +// - io.Closer interface +// - Wait from multiple places +// - Done channel +type Command struct { + *exec.Cmd + ctx context.Context + cancel context.CancelFunc + err error +} + +func NewCommand(s string) *Command { + ctx, cancel := context.WithCancel(context.Background()) + args := QuoteSplit(s) + cmd := exec.CommandContext(ctx, args[0], args[1:]...) + cmd.SysProcAttr = procAttr + return &Command{cmd, ctx, cancel, nil} +} + +func (c *Command) Start() error { + if err := c.Cmd.Start(); err != nil { + return err + } + + go func() { + c.err = c.Cmd.Wait() + c.cancel() // release context resources + }() + + return nil +} + +func (c *Command) Wait() error { + <-c.ctx.Done() + return c.err +} + +func (c *Command) Run() error { + if err := c.Start(); err != nil { + return err + } + return c.Wait() +} + +func (c *Command) Done() <-chan struct{} { + return c.ctx.Done() +} + +func (c *Command) Close() error { + c.cancel() + return nil +} diff --git a/pkg/shell/procattr.go b/pkg/shell/procattr.go new file mode 100644 index 00000000..fffdc2a4 --- /dev/null +++ b/pkg/shell/procattr.go @@ -0,0 +1,7 @@ +//go:build !linux + +package shell + +import "syscall" + +var procAttr *syscall.SysProcAttr diff --git a/pkg/shell/procattr_linux.go b/pkg/shell/procattr_linux.go new file mode 100644 index 00000000..cef1d152 --- /dev/null +++ b/pkg/shell/procattr_linux.go @@ -0,0 +1,6 @@ +package shell + +import "syscall" + +// will stop child if parent died (even with SIGKILL) +var procAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM} diff --git a/pkg/stdin/backchannel.go b/pkg/stdin/backchannel.go index b9a4a6d4..b154a291 100644 --- a/pkg/stdin/backchannel.go +++ b/pkg/stdin/backchannel.go @@ -2,7 +2,6 @@ package stdin import ( "encoding/json" - "errors" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" @@ -42,10 +41,7 @@ func (c *Client) Stop() (err error) { if c.sender != nil { c.sender.Close() } - if c.cmd.Process == nil { - return nil - } - return errors.Join(c.cmd.Process.Kill(), c.cmd.Wait()) + return c.cmd.Close() } func (c *Client) MarshalJSON() ([]byte, error) { diff --git a/pkg/stdin/client.go b/pkg/stdin/client.go index 09e525ad..a77d4459 100644 --- a/pkg/stdin/client.go +++ b/pkg/stdin/client.go @@ -1,21 +1,20 @@ package stdin import ( - "os/exec" - "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/shell" ) // Deprecated: should be rewritten to core.Connection type Client struct { - cmd *exec.Cmd + cmd *shell.Command medias []*core.Media sender *core.Sender send int } -func NewClient(cmd *exec.Cmd) (*Client, error) { +func NewClient(cmd *shell.Command) (*Client, error) { c := &Client{ cmd: cmd, medias: []*core.Media{