Files
frankenphp/threadworker.go
Rob Landers 52df300f86 feat: custom workers initial support (#1795)
* create a simple thread framework

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add tests

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix comment

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* remove mention of an old function that no longer exists

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* simplify providing a request

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* satisfy linter

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add error handling and handle shutdowns

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add tests

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* pipes are tied to workers, not threads

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix test

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add a way to detect when a request is completed

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* we never shutdown workers or remove them, so we do not need this

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add more comments

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* Simplify modular threads (#1874)

* Simplify

* remove unused variable

* log thread index

* feat: allow passing parameters to the PHP callback and accessing its return value (#1881)

* fix formatting

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix test compilation

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix segfaults

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* Update frankenphp.c

Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>

---------

Signed-off-by: Robert Landers <landers.robert@gmail.com>
Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
2025-09-18 09:21:49 +02:00

256 lines
8.8 KiB
Go

package frankenphp
// #include "frankenphp.h"
import "C"
import (
"context"
"log/slog"
"path/filepath"
"time"
"unsafe"
)
// representation of a thread assigned to a worker script
// executes the PHP worker script in a loop
// implements the threadHandler interface
type workerThread struct {
state *threadState
thread *phpThread
worker *worker
dummyContext *frankenPHPContext
workerContext *frankenPHPContext
backoff *exponentialBackoff
externalWorker WorkerExtension
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
}
func convertToWorkerThread(thread *phpThread, worker *worker) {
externalWorker := externalWorkers[worker.name]
thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
worker: worker,
backoff: &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: worker.maxConsecutiveFailures,
},
externalWorker: externalWorker,
})
worker.attachThread(thread)
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *workerThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
if handler.externalWorker != nil {
handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateRestarting:
if handler.externalWorker != nil {
handler.externalWorker.ThreadDrainNotification(handler.thread.threadIndex)
}
handler.state.set(stateYielding)
handler.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateReady, stateTransitionComplete:
if handler.externalWorker != nil {
handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex)
}
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
if handler.externalWorker != nil {
handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
func (handler *workerThread) afterScriptExecution(exitStatus int) {
tearDownWorkerScript(handler, exitStatus)
}
func (handler *workerThread) getRequestContext() *frankenPHPContext {
if handler.workerContext != nil {
return handler.workerContext
}
return handler.dummyContext
}
func (handler *workerThread) name() string {
return "Worker PHP Thread - " + handler.worker.fileName
}
func setupWorkerScript(handler *workerThread, worker *worker) {
handler.backoff.wait()
metrics.StartWorker(worker.name)
if handler.state.is(stateReady) {
metrics.ReadyWorker(handler.worker.name)
}
// Create a dummy request to set up the worker
fc, err := newDummyContext(
filepath.Base(worker.fileName),
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
)
if err != nil {
panic(err)
}
fc.worker = worker
handler.dummyContext = fc
handler.isBootingScript = true
clearSandboxedEnv(handler.thread)
logger.LogAttrs(context.Background(), slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
}
func tearDownWorkerScript(handler *workerThread, exitStatus int) {
worker := handler.worker
handler.dummyContext = nil
ctx := context.Background()
// if the worker request is not nil, the script might have crashed
// make sure to close the worker request context
if handler.workerContext != nil {
handler.workerContext.closeContext()
handler.workerContext = nil
}
// on exit status 0 we just run the worker script again
if exitStatus == 0 && !handler.isBootingScript {
metrics.StopWorker(worker.name, StopReasonRestart)
handler.backoff.recordSuccess()
logger.LogAttrs(ctx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus))
return
}
// worker has thrown a fatal error or has not reached frankenphp_handle_request
metrics.StopWorker(worker.name, StopReasonCrash)
if !handler.isBootingScript {
// fatal error (could be due to exit(1), timeouts, etc.)
logger.LogAttrs(ctx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus))
return
}
logger.LogAttrs(ctx, slog.LevelError, "worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
// panic after exponential backoff if the worker has never reached frankenphp_handle_request
if handler.backoff.recordFailure() {
if !watcherIsEnabled && !handler.state.is(stateReady) {
logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.failureCount))
panic("too many consecutive worker failures")
}
logger.LogAttrs(ctx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.failureCount))
}
}
// waitForWorkerRequest is called during frankenphp_handle_request in the php worker script.
func (handler *workerThread) waitForWorkerRequest() (bool, any) {
// unpin any memory left over from previous requests
handler.thread.Unpin()
ctx := context.Background()
logger.LogAttrs(ctx, slog.LevelDebug, "waiting for request", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
// Clear the first dummy request created to initialize the worker
if handler.isBootingScript {
handler.isBootingScript = false
if !C.frankenphp_shutdown_dummy_request() {
panic("Not in CGI context")
}
}
// worker threads are 'ready' after they first reach frankenphp_handle_request()
// 'stateTransitionComplete' is only true on the first boot of the worker script,
// while 'isBootingScript' is true on every boot of the worker script
if handler.state.is(stateTransitionComplete) {
metrics.ReadyWorker(handler.worker.name)
handler.state.set(stateReady)
}
handler.state.markAsWaiting(true)
var fc *frankenPHPContext
select {
case <-handler.thread.drainChan:
logger.LogAttrs(ctx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
// flush the opcache when restarting due to watcher or admin api
// note: this is done right before frankenphp_handle_request() returns 'false'
if handler.state.is(stateRestarting) {
C.frankenphp_reset_opcache()
}
return false, nil
case fc = <-handler.thread.requestChan:
case fc = <-handler.worker.requestChan:
}
handler.workerContext = fc
handler.state.markAsWaiting(false)
logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", fc.request.RequestURI))
return true, fc.handlerParameters
}
// go_frankenphp_worker_handle_request_start is called at the start of every php request served.
//
//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool, unsafe.Pointer) {
handler := phpThreads[threadIndex].handler.(*workerThread)
hasRequest, parameters := handler.waitForWorkerRequest()
if parameters != nil {
p := PHPValue(parameters)
handler.thread.Pin(p)
return C.bool(hasRequest), p
}
return C.bool(hasRequest), nil
}
// go_frankenphp_finish_worker_request is called at the end of every php request served.
//
//export go_frankenphp_finish_worker_request
func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval) {
thread := phpThreads[threadIndex]
fc := thread.getRequestContext()
if retval != nil {
fc.handlerReturn = GoValue(unsafe.Pointer(retval))
}
fc.closeContext()
thread.handler.(*workerThread).workerContext = nil
fc.logger.LogAttrs(context.Background(), slog.LevelDebug, "request handling finished", slog.String("worker", fc.scriptFilename), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI))
}
// when frankenphp_finish_request() is directly called from PHP
//
//export go_frankenphp_finish_php_request
func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
thread := phpThreads[threadIndex]
fc := thread.getRequestContext()
fc.closeContext()
fc.logger.LogAttrs(context.Background(), slog.LevelDebug, "request handling finished", slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI))
}