Rewrite exec handler

This commit is contained in:
Alex X
2025-02-23 20:56:48 +03:00
parent 7d41dc21c1
commit 6fb59949a2
7 changed files with 113 additions and 71 deletions

View File

@@ -1,39 +0,0 @@
package exec
import (
"errors"
"net/url"
"os"
"os/exec"
"syscall"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
// closer support custom killsignal with custom killtimeout
type closer struct {
cmd *exec.Cmd
query url.Values
}
func (c *closer) Close() (err error) {
sig := os.Kill
if s := c.query.Get("killsignal"); s != "" {
sig = syscall.Signal(core.Atoi(s))
}
log.Trace().Msgf("[exec] kill with signal=%d", sig)
err = c.cmd.Process.Signal(sig)
if s := c.query.Get("killtimeout"); s != "" {
timeout := time.Duration(core.Atoi(s)) * time.Second
timer := time.AfterFunc(timeout, func() {
log.Trace().Msgf("[exec] kill after timeout=%s", s)
_ = c.cmd.Process.Kill()
})
defer timer.Stop() // stop timer if Wait ends before timeout
}
return errors.Join(err, c.cmd.Wait())
}

View File

@@ -9,9 +9,9 @@ import (
"io" "io"
"net/url" "net/url"
"os" "os"
"os/exec"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
"github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/app"
@@ -49,7 +49,7 @@ func Init() {
log = app.GetLogger("exec") log = app.GetLogger("exec")
} }
func execHandle(rawURL string) (core.Producer, error) { func execHandle(rawURL string) (prod core.Producer, err error) {
rawURL, rawQuery, _ := strings.Cut(rawURL, "#") rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
query := streams.ParseQuery(rawQuery) query := streams.ParseQuery(rawQuery)
@@ -67,39 +67,55 @@ func execHandle(rawURL string) (core.Producer, error) {
rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:] rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:]
} }
args := shell.QuoteSplit(rawURL[5:]) // remove `exec:` cmd := shell.NewCommand(rawURL[5:]) // remove `exec:`
cmd := exec.Command(args[0], args[1:]...)
cmd.Stderr = &logWriter{ cmd.Stderr = &logWriter{
buf: make([]byte, 512), buf: make([]byte, 512),
debug: log.Debug().Enabled(), debug: log.Debug().Enabled(),
} }
if s := query.Get("killsignal"); s != "" {
sig := syscall.Signal(core.Atoi(s))
cmd.Cancel = func() error {
log.Debug().Msgf("[exec] kill with signal=%d", sig)
return cmd.Process.Signal(sig)
}
}
if s := query.Get("killtimeout"); s != "" {
cmd.WaitDelay = time.Duration(core.Atoi(s)) * time.Second
}
if query.Get("backchannel") == "1" { if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd) return stdin.NewClient(cmd)
} }
cl := &closer{cmd: cmd, query: query}
if path == "" { if path == "" {
return handlePipe(rawURL, cmd, cl) prod, err = handlePipe(rawURL, cmd)
} else {
prod, err = handleRTSP(rawURL, cmd, path)
} }
return handleRTSP(rawURL, cmd, cl, path) if err != nil {
_ = cmd.Close()
}
return
} }
func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) { func handlePipe(source string, cmd *shell.Command) (core.Producer, error) {
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return nil, err return nil, err
} }
rc := struct { rd := struct {
io.Reader io.Reader
io.Closer io.Closer
}{ }{
// add buffer for pipe reader to reduce syscall // add buffer for pipe reader to reduce syscall
bufio.NewReaderSize(stdout, core.BufferSize), bufio.NewReaderSize(stdout, core.BufferSize),
cl, // stop cmd on close pipe call
cmd,
} }
log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe") log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe")
@@ -110,9 +126,8 @@ func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, erro
return nil, err return nil, err
} }
prod, err := magic.Open(rc) prod, err := magic.Open(rd)
if err != nil { if err != nil {
_ = rc.Close()
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr) return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
} }
@@ -126,7 +141,7 @@ func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, erro
return prod, nil return prod, nil
} }
func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) { func handleRTSP(source string, cmd *shell.Command, path string) (core.Producer, error) {
if log.Trace().Enabled() { if log.Trace().Enabled() {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
} }
@@ -152,23 +167,22 @@ func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.P
return nil, err return nil, err
} }
done := make(chan error, 1) timeout := time.NewTimer(30 * time.Second)
go func() { defer timeout.Stop()
done <- cmd.Wait()
}()
select { select {
case <-time.After(time.Minute): case <-timeout.C:
// haven't received data from app in timeout
log.Error().Str("source", source).Msg("[exec] timeout") log.Error().Str("source", source).Msg("[exec] timeout")
_ = cl.Close()
return nil, errors.New("exec: timeout") return nil, errors.New("exec: timeout")
case <-done: case <-cmd.Done():
// limit message size // app fail before we receive any data
return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr) return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr)
case prod := <-waiter: case prod := <-waiter:
// app started successfully
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp") log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
setRemoteInfo(prod, source, cmd.Args) setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = cl.Close prod.OnClose = cmd.Close
return prod, nil return prod, nil
} }
} }

59
pkg/shell/command.go Normal file
View File

@@ -0,0 +1,59 @@
package shell
import (
"context"
"os/exec"
)
// Command like exec.Cmd, but with support:
// - io.Closer interface
// - Wait from multiple places
// - Done channel
type Command struct {
*exec.Cmd
ctx context.Context
cancel context.CancelFunc
err error
}
func NewCommand(s string) *Command {
ctx, cancel := context.WithCancel(context.Background())
args := QuoteSplit(s)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.SysProcAttr = procAttr
return &Command{cmd, ctx, cancel, nil}
}
func (c *Command) Start() error {
if err := c.Cmd.Start(); err != nil {
return err
}
go func() {
c.err = c.Cmd.Wait()
c.cancel() // release context resources
}()
return nil
}
func (c *Command) Wait() error {
<-c.ctx.Done()
return c.err
}
func (c *Command) Run() error {
if err := c.Start(); err != nil {
return err
}
return c.Wait()
}
func (c *Command) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *Command) Close() error {
c.cancel()
return nil
}

7
pkg/shell/procattr.go Normal file
View File

@@ -0,0 +1,7 @@
//go:build !linux
package shell
import "syscall"
var procAttr *syscall.SysProcAttr

View File

@@ -0,0 +1,6 @@
package shell
import "syscall"
// will stop child if parent died (even with SIGKILL)
var procAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}

View File

@@ -2,7 +2,6 @@ package stdin
import ( import (
"encoding/json" "encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -42,10 +41,7 @@ func (c *Client) Stop() (err error) {
if c.sender != nil { if c.sender != nil {
c.sender.Close() c.sender.Close()
} }
if c.cmd.Process == nil { return c.cmd.Close()
return nil
}
return errors.Join(c.cmd.Process.Kill(), c.cmd.Wait())
} }
func (c *Client) MarshalJSON() ([]byte, error) { func (c *Client) MarshalJSON() ([]byte, error) {

View File

@@ -1,21 +1,20 @@
package stdin package stdin
import ( import (
"os/exec"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
) )
// Deprecated: should be rewritten to core.Connection // Deprecated: should be rewritten to core.Connection
type Client struct { type Client struct {
cmd *exec.Cmd cmd *shell.Command
medias []*core.Media medias []*core.Media
sender *core.Sender sender *core.Sender
send int send int
} }
func NewClient(cmd *exec.Cmd) (*Client, error) { func NewClient(cmd *shell.Command) (*Client, error) {
c := &Client{ c := &Client{
cmd: cmd, cmd: cmd,
medias: []*core.Media{ medias: []*core.Media{