Files
supercronic/main.go
Ashley Mathew cfd6038f87 fix(reaper): forkExec error no such file or directory & prometheus exit error
This is the fix described in https://github.com/aptible/supercronic/pull/178
but with minimal changes applied, since we want to limit the scope of changes
of an individual PR.
2025-09-24 14:43:45 -04:00

293 lines
7.0 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/aptible/supercronic/cron"
"github.com/aptible/supercronic/crontab"
"github.com/aptible/supercronic/log/hook"
"github.com/aptible/supercronic/prometheus_metrics"
"github.com/evalphobia/logrus_sentry"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
var (
Version = "<unset>"
)
var Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] CRONTAB\n\nAvailable options:\n", os.Args[0])
flag.PrintDefaults()
}
func main() {
debug := flag.Bool("debug", false, "enable debug logging")
quiet := flag.Bool("quiet", false, "do not log informational messages (takes precedence over debug)")
json := flag.Bool("json", false, "enable JSON logging")
printVersion := flag.Bool("version", false, "print version and exit")
test := flag.Bool("test", false, "test crontab (does not run jobs)")
inotify := flag.Bool("inotify", false, "use inotify to detect crontab file changes")
// If this flag changes, update forkExec to disable reaping in the child process
disableReap := flag.Bool("no-reap", false, "disable reaping of dead processes, note: reaping requires pid 1")
prometheusListen := flag.String(
"prometheus-listen-address",
"",
fmt.Sprintf(
"give a valid ip[:port] address to expose Prometheus metrics at /metrics (port defaults to %s), "+
"use 0.0.0.0 for all network interfaces.",
prometheus_metrics.DefaultPort,
),
)
splitLogs := flag.Bool("split-logs", false, "split log output into stdout/stderr")
passthroughLogs := flag.Bool("passthrough-logs", false, "passthrough logs from commands, do not wrap them in Supercronic logging")
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
sentryEnvironmentFlag := flag.String("sentry-environment", "", "specify the application's environment for Sentry error reporting")
sentryReleaseFlag := flag.String("sentry-release", "", "specify the application's release version for Sentry error reporting")
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
overlapping := flag.Bool("overlapping", false, "enable tasks overlapping")
flag.Parse()
var (
sentryDsn string
sentryEnvironment string
sentryRelease string
)
sentryDsn = os.Getenv("SENTRY_DSN")
sentryEnvironment = os.Getenv("SENTRY_ENVIRONMENT")
sentryRelease = os.Getenv("SENTRY_RELEASE")
if *sentryAlias != "" {
sentryDsn = *sentryAlias
}
if *sentry != "" {
sentryDsn = *sentry
}
if *sentryEnvironmentFlag != "" {
sentryEnvironment = *sentryEnvironmentFlag
}
if *sentryReleaseFlag != "" {
sentryRelease = *sentryReleaseFlag
}
if *debug {
logrus.SetLevel(logrus.DebugLevel)
}
if *quiet {
logrus.SetLevel(logrus.WarnLevel)
}
if *json {
logrus.SetFormatter(&logrus.JSONFormatter{})
} else {
logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true})
}
if *splitLogs {
hook.RegisterSplitLogger(
logrus.StandardLogger(),
os.Stdout,
os.Stderr,
)
}
if *printVersion {
fmt.Println(Version)
os.Exit(0)
return
}
if flag.NArg() != 1 {
Usage()
os.Exit(2)
return
}
if !*disableReap {
if os.Getpid() == 1 {
// Clean up zombie processes caused by incorrect crontab commands
// Use forkExec to avoid random waitid errors
// https://github.com/aptible/supercronic/issues/88
// https://github.com/aptible/supercronic/issues/171
logrus.Info("reaping dead processes")
forkExec()
return
}
logrus.Warn("process reaping disabled, not pid 1")
}
crontabFileName := flag.Args()[0]
var watcher *fsnotify.Watcher
if *inotify {
logrus.Info("using inotify to detect crontab file changes")
var err error
watcher, err = fsnotify.NewWatcher()
if err != nil {
logrus.Fatal(err)
return
}
defer watcher.Close()
logrus.Infof("adding file watch for '%s'", crontabFileName)
if err := watcher.Add(crontabFileName); err != nil {
logrus.Fatal(err)
return
}
}
var sentryHook *logrus_sentry.SentryHook
if sentryDsn != "" {
sentryLevels := []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
}
sh, err := logrus_sentry.NewSentryHook(sentryDsn, sentryLevels)
if err != nil {
logrus.Fatalf("Could not init sentry logger: %s", err)
} else {
sh.Timeout = 5 * time.Second
sentryHook = sh
}
if sentryEnvironment != "" {
sh.SetEnvironment(sentryEnvironment)
}
if sentryRelease != "" {
sh.SetRelease(sentryRelease)
}
if sentryHook != nil {
logrus.StandardLogger().AddHook(sentryHook)
}
}
promMetrics := prometheus_metrics.NewPrometheusMetrics()
if *prometheusListen != "" {
promServerShutdownClosure, err := prometheus_metrics.InitHTTPServer(*prometheusListen, context.Background())
if err != nil {
logrus.Fatalf("prometheus http startup failed: %s", err.Error())
}
defer func() {
if err := promServerShutdownClosure(); err != nil {
logrus.Fatalf("prometheus http shutdown failed: %s", err.Error())
}
}()
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, signalList...)
if *inotify {
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
logrus.Debugf("event: %v, watch-list: %v", event, watcher.WatchList())
switch event.Op {
case event.Op & fsnotify.Write:
logrus.Debug("watched file changed")
termChan <- syscall.SIGUSR2
// workaround for k8s configmap and secret mounts
case event.Op & fsnotify.Remove:
logrus.Debug("watched file changed")
if err := watcher.Add(crontabFileName); err != nil {
logrus.Fatal(err)
return
}
termChan <- syscall.SIGUSR2
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
logrus.Error("error:", err)
}
}
}()
}
for {
promMetrics.Reset()
logrus.Infof("read crontab: %s", crontabFileName)
tab, err := readCrontabAtPath(crontabFileName)
if err != nil {
logrus.Fatal(err)
break
}
if *test {
logrus.Info("crontab is valid")
os.Exit(0)
break
}
var wg sync.WaitGroup
exitCtx, notifyExit := context.WithCancel(context.Background())
for _, job := range tab.Jobs {
cronLogger := logrus.WithFields(logrus.Fields{
"job.schedule": job.Schedule,
"job.command": job.Command,
"job.position": job.Position,
})
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *passthroughLogs, &promMetrics)
}
termSig := <-termChan
if termSig == syscall.SIGUSR2 {
logrus.Infof("received %s, reloading crontab", termSig)
} else {
logrus.Infof("received %s, shutting down", termSig)
}
notifyExit()
logrus.Info("waiting for jobs to finish")
wg.Wait()
if termSig != syscall.SIGUSR2 {
logrus.Info("exiting")
break
}
}
}
func readCrontabAtPath(path string) (*crontab.Crontab, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
return crontab.ParseCrontab(file)
}
var signalList = []os.Signal{
syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2,
}