package exec import ( "crypto/md5" "encoding/hex" "errors" "fmt" "net/url" "os" "os/exec" "strings" "sync" "time" "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/rtsp" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/magic" pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" "github.com/AlexxIT/go2rtc/pkg/stdin" "github.com/rs/zerolog" ) func Init() { rtsp.HandleFunc(func(conn *pkg.Conn) bool { waitersMu.Lock() waiter := waiters[conn.URL.Path] waitersMu.Unlock() if waiter == nil { return false } // unblocking write to channel select { case waiter <- conn: return true default: return false } }) streams.HandleFunc("exec", execHandle) log = app.GetLogger("exec") } func execHandle(rawURL string) (core.Producer, error) { var path string var query url.Values // RTSP flow should have `{output}` inside URL // pipe flow may have `#{params}` inside URL if i := strings.Index(rawURL, "{output}"); i > 0 { if rtsp.Port == "" { return nil, errors.New("exec: rtsp module disabled") } sum := md5.Sum([]byte(rawURL)) path = "/" + hex.EncodeToString(sum[:]) rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:] } else if i = strings.IndexByte(rawURL, '#'); i > 0 { query = streams.ParseQuery(rawURL[i+1:]) rawURL = rawURL[:i] } args := shell.QuoteSplit(rawURL[5:]) // remove `exec:` cmd := exec.Command(args[0], args[1:]...) cmd.Stderr = &logWriter{ buf: make([]byte, 512), debug: log.Debug().Enabled(), } if path == "" { return handlePipe(rawURL, cmd, query) } return handleRTSP(rawURL, cmd, path) } func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) { if query.Get("backchannel") == "1" { return stdin.NewClient(cmd) } r, err := PipeCloser(cmd, query) if err != nil { return nil, err } log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe") ts := time.Now() if err = cmd.Start(); err != nil { return nil, err } prod, err := magic.Open(r) if err != nil { _ = r.Close() } log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run pipe") return prod, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr) } func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { if log.Trace().Enabled() { cmd.Stdout = os.Stdout } waiter := make(chan *pkg.Conn, 1) waitersMu.Lock() waiters[path] = waiter waitersMu.Unlock() defer func() { waitersMu.Lock() delete(waiters, path) waitersMu.Unlock() }() log.Debug().Strs("args", cmd.Args).Msg("[exec] run rtsp") ts := time.Now() if err := cmd.Start(); err != nil { log.Error().Err(err).Str("url", url).Msg("[exec]") return nil, err } done := make(chan error, 1) go func() { done <- cmd.Wait() }() select { case <-time.After(time.Second * 60): _ = cmd.Process.Kill() log.Error().Str("url", url).Msg("[exec] timeout") return nil, errors.New("exec: timeout") case <-done: // limit message size return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr) case prod := <-waiter: log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp") prod.OnClose = func() error { log.Debug().Msgf("[exec] kill rtsp") return errors.Join(cmd.Process.Kill(), cmd.Wait()) } return prod, nil } } // internal var ( log zerolog.Logger waiters = make(map[string]chan *pkg.Conn) waitersMu sync.Mutex ) type logWriter struct { buf []byte debug bool n int } func (l *logWriter) String() string { if l.n == len(l.buf) { return string(l.buf) + "..." } return string(l.buf[:l.n]) } func (l *logWriter) Write(p []byte) (n int, err error) { if l.n < cap(l.buf) { l.n += copy(l.buf[l.n:], p) } n = len(p) if l.debug { if p = trimSpace(p); p != nil { log.Debug().Msgf("[exec] %s", p) } } return } func trimSpace(b []byte) []byte { start := 0 stop := len(b) for ; start < stop; start++ { if b[start] >= ' ' { break // trim all ASCII before 0x20 } } for ; ; stop-- { if stop == start { return nil // skip empty output } if b[stop-1] > ' ' { break // trim all ASCII before 0x21 } } return b[start:stop] }