Files
frankenphp/worker.go
2024-11-23 11:29:56 +01:00

330 lines
8.1 KiB
Go

package frankenphp
// #include <stdlib.h>
// #include "frankenphp.h"
import "C"
import (
"fmt"
"github.com/dunglas/frankenphp/internal/fastabs"
"net/http"
"path/filepath"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/watcher"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type worker struct {
fileName string
num int
env PreparedEnv
requestChan chan *http.Request
threads []*phpThread
threadMutex sync.RWMutex
ready chan struct{}
}
var (
watcherIsEnabled bool
workerShutdownWG sync.WaitGroup
workersDone chan interface{}
workers = make(map[string]*worker)
)
func initWorkers(opt []workerOpt) error {
workersDone = make(chan interface{})
ready := sync.WaitGroup{}
for _, o := range opt {
worker, err := newWorker(o)
worker.threads = make([]*phpThread, 0, o.num)
if err != nil {
return err
}
for i := 0; i < worker.num; i++ {
go worker.startNewWorkerThread()
}
ready.Add(1)
go func() {
for i := 0; i < worker.num; i++ {
<-worker.ready
}
ready.Done()
}()
}
ready.Wait()
return nil
}
func newWorker(o workerOpt) (*worker, error) {
absFileName, err := fastabs.FastAbs(o.fileName)
if err != nil {
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
}
// if the worker already exists, return it,
// it's necessary since we don't want to destroy the channels when restarting on file changes
if w, ok := workers[absFileName]; ok {
return w, nil
}
if o.env == nil {
o.env = make(PreparedEnv, 1)
}
o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{
fileName: absFileName,
num: o.num,
env: o.env,
requestChan: make(chan *http.Request),
ready: make(chan struct{}, o.num),
}
workers[absFileName] = w
return w, nil
}
func (worker *worker) startNewWorkerThread() {
workerShutdownWG.Add(1)
defer workerShutdownWG.Done()
backoff := &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: 6,
}
for {
// if the worker can stay up longer than backoff*2, it is probably an application error
backoff.wait()
metrics.StartWorker(worker.fileName)
// Create main dummy request
r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
if err != nil {
panic(err)
}
r, err = NewRequestWithContext(
r,
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
)
if err != nil {
panic(err)
}
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
}
if err := ServeHTTP(nil, r); err != nil {
panic(err)
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
// if we are done, exit the loop that restarts the worker script
select {
case _, ok := <-workersDone:
if !ok {
metrics.StopWorker(worker.fileName, StopReasonShutdown)
if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
return
}
// continue on since the channel is still open
default:
// continue on since the channel is still open
}
// on exit status 0 we just run the worker script again
if fc.exitStatus == 0 {
// TODO: make the max restart configurable
if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
metrics.StopWorker(worker.fileName, StopReasonRestart)
backoff.recordSuccess()
continue
}
// on exit status 1 we log the error and apply an exponential backoff when restarting
if backoff.recordFailure() {
if !watcherIsEnabled {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount))
}
metrics.StopWorker(worker.fileName, StopReasonCrash)
}
// unreachable
}
func (worker *worker) handleRequest(r *http.Request) {
worker.threadMutex.RLock()
// dispatch requests to all worker threads in order
for _, thread := range worker.threads {
select {
case thread.requestChan <- r:
worker.threadMutex.RUnlock()
return
default:
}
}
worker.threadMutex.RUnlock()
// if no thread was available, fan the request out to all threads
// TODO: theoretically there could be autoscaling of threads here
worker.requestChan <- r
}
func stopWorkers() {
close(workersDone)
}
func drainWorkers() {
watcher.DrainWatcher()
watcherIsEnabled = false
stopWorkers()
workerShutdownWG.Wait()
workers = make(map[string]*worker)
}
func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
var directoriesToWatch []string
for _, w := range workerOpts {
directoriesToWatch = append(directoriesToWatch, w.watch...)
}
watcherIsEnabled = len(directoriesToWatch) > 0
if !watcherIsEnabled {
return nil
}
restartWorkers := func() {
restartWorkers(workerOpts)
}
if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
return err
}
return nil
}
func restartWorkers(workerOpts []workerOpt) {
stopWorkers()
workerShutdownWG.Wait()
if err := initWorkers(workerOpts); err != nil {
logger.Error("failed to restart workers when watching files")
panic(err)
}
logger.Info("workers restarted successfully")
}
func assignThreadToWorker(thread *phpThread) {
fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
worker, ok := workers[fc.scriptFilename]
if !ok {
panic("worker not found for script: " + fc.scriptFilename)
}
thread.worker = worker
thread.requestChan = make(chan *http.Request)
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
worker.threadMutex.Unlock()
}
//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
thread := phpThreads[threadIndex]
// we assign a worker to the thread if it doesn't have one already
if thread.worker == nil {
assignThreadToWorker(thread)
}
thread.readiedOnce.Do(func() {
// inform metrics that the worker is ready
metrics.ReadyWorker(thread.worker.fileName)
})
select {
case thread.worker.ready <- struct{}{}:
default:
}
if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
}
var r *http.Request
select {
case <-workersDone:
if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
}
thread.worker = nil
C.frankenphp_reset_opcache()
return C.bool(false)
case r = <-thread.worker.requestChan:
case r = <-thread.requestChan:
}
thread.workerRequest = r
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
}
if err := updateServerContext(thread, r, false, true); err != nil {
// Unexpected error
if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
rejectRequest(fc.responseWriter, err.Error())
maybeCloseContext(fc)
thread.workerRequest = nil
thread.Unpin()
return go_frankenphp_worker_handle_request_start(threadIndex)
}
return C.bool(true)
}
//export go_frankenphp_finish_request
func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
thread := phpThreads[threadIndex]
r := thread.getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if isWorkerRequest {
thread.workerRequest = nil
}
maybeCloseContext(fc)
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
var fields []zap.Field
if isWorkerRequest {
fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
} else {
fields = append(fields, zap.String("url", r.RequestURI))
}
c.Write(fields...)
}
if isWorkerRequest {
thread.Unpin()
}
}