refactor: decouple worker threads from non-worker threads (#1137)

* Decouple workers.

* Moves code to separate file.

* Cleans up the exponential backoff.

* Initial working implementation.

* Refactors php threads to take callbacks.

* Cleanup.

* Cleanup.

* Cleanup.

* Cleanup.

* Adjusts watcher logic.

* Adjusts the watcher logic.

* Fix opcache_reset race condition.

* Fixing merge conflicts and formatting.

* Prevents overlapping of TSRM reservation and script execution.

* Adjustments as suggested by @dunglas.

* Adds error assertions.

* Adds comments.

* Removes logs and explicitly compares to C.false.

* Resets check.

* Adds cast for safety.

* Fixes waitgroup overflow.

* Resolves waitgroup race condition on startup.

* Moves worker request logic to worker.go.

* Removes defer.

* Removes call from go to c.

* Fixes merge conflict.

* Adds fibers test back in.

* Refactors new thread loop approach.

* Removes redundant check.

* Adds compareAndSwap.

* Refactor: removes global waitgroups and uses a 'thread state' abstraction instead.

* Removes unnecessary method.

* Updates comment.

* Removes unnecessary booleans.

* test

* First state machine steps.

* Splits threads.

* Minimal working implementation with broken tests.

* Fixes tests.

* Refactoring.

* Fixes merge conflicts.

* Formatting

* C formatting.

* More cleanup.

* Allows for clean state transitions.

* Adds state tests.

* Adds support for thread transitioning.

* Fixes the testdata path.

* Formatting.

* Allows transitioning back to inactive state.

* Fixes go linting.

* Formatting.

* Removes duplication.

* Applies suggestions by @dunglas

* Removes redundant check.

* Locks the handler on restart.

* Removes unnecessary log.

* Changes Unpin() logic as suggested by @withinboredom

* Adds suggestions by @dunglas and resolves TODO.

* Makes restarts fully safe.

* Will make the initial startup fail even if the watcher is enabled (as is currently the case)

* Also adds compareAndSwap to the test.

* Adds comment.

* Prevents panic on initial watcher startup.
This commit is contained in:
Alliballibaba2
2024-12-17 11:28:51 +01:00
committed by GitHub
parent 2676bffa98
commit f592e0f47b
20 changed files with 1082 additions and 465 deletions

2
cgi.go
View File

@@ -227,8 +227,6 @@ func go_frankenphp_release_known_variable_keys(threadIndex C.uintptr_t) {
for _, v := range thread.knownVariableKeys {
C.frankenphp_release_zend_string(v)
}
// release everything that might still be pinned to the thread
thread.Unpin()
thread.knownVariableKeys = nil
}

View File

@@ -89,7 +89,7 @@ static void frankenphp_free_request_context() {
free(ctx->cookie_data);
ctx->cookie_data = NULL;
/* Is freed via thread.Unpin() at the end of each request */
/* Is freed via thread.Unpin() */
SG(request_info).auth_password = NULL;
SG(request_info).auth_user = NULL;
SG(request_info).request_method = NULL;
@@ -243,7 +243,7 @@ PHP_FUNCTION(frankenphp_finish_request) { /* {{{ */
php_header();
if (ctx->has_active_request) {
go_frankenphp_finish_request(thread_index, false);
go_frankenphp_finish_php_request(thread_index);
}
ctx->finished = true;
@@ -443,7 +443,7 @@ PHP_FUNCTION(frankenphp_handle_request) {
frankenphp_worker_request_shutdown();
ctx->has_active_request = false;
go_frankenphp_finish_request(thread_index, true);
go_frankenphp_finish_worker_request(thread_index);
RETURN_TRUE;
}
@@ -811,9 +811,9 @@ static void set_thread_name(char *thread_name) {
}
static void *php_thread(void *arg) {
char thread_name[16] = {0};
snprintf(thread_name, 16, "php-%" PRIxPTR, (uintptr_t)arg);
thread_index = (uintptr_t)arg;
char thread_name[16] = {0};
snprintf(thread_name, 16, "php-%" PRIxPTR, thread_index);
set_thread_name(thread_name);
#ifdef ZTS
@@ -832,7 +832,11 @@ static void *php_thread(void *arg) {
cfg_get_string("filter.default", &default_filter);
should_filter_var = default_filter != NULL;
while (go_handle_request(thread_index)) {
// loop until Go signals to stop
char *scriptName = NULL;
while ((scriptName = go_frankenphp_before_script_execution(thread_index))) {
go_frankenphp_after_script_execution(thread_index,
frankenphp_execute_script(scriptName));
}
go_frankenphp_release_known_variable_keys(thread_index);
@@ -841,6 +845,8 @@ static void *php_thread(void *arg) {
ts_free_thread();
#endif
go_frankenphp_on_thread_shutdown(thread_index);
return NULL;
}
@@ -858,13 +864,11 @@ static void *php_main(void *arg) {
exit(EXIT_FAILURE);
}
intptr_t num_threads = (intptr_t)arg;
set_thread_name("php-main");
#ifdef ZTS
#if (PHP_VERSION_ID >= 80300)
php_tsrm_startup_ex(num_threads);
php_tsrm_startup_ex((intptr_t)arg);
#else
php_tsrm_startup();
#endif
@@ -892,28 +896,7 @@ static void *php_main(void *arg) {
frankenphp_sapi_module.startup(&frankenphp_sapi_module);
pthread_t *threads = malloc(num_threads * sizeof(pthread_t));
if (threads == NULL) {
perror("malloc failed");
exit(EXIT_FAILURE);
}
for (uintptr_t i = 0; i < num_threads; i++) {
if (pthread_create(&(*(threads + i)), NULL, &php_thread, (void *)i) != 0) {
perror("failed to create PHP thread");
free(threads);
exit(EXIT_FAILURE);
}
}
for (int i = 0; i < num_threads; i++) {
if (pthread_join((*(threads + i)), NULL) != 0) {
perror("failed to join PHP thread");
free(threads);
exit(EXIT_FAILURE);
}
}
free(threads);
go_frankenphp_main_thread_is_ready();
/* channel closed, shutdown gracefully */
frankenphp_sapi_module.shutdown(&frankenphp_sapi_module);
@@ -929,25 +912,30 @@ static void *php_main(void *arg) {
frankenphp_sapi_module.ini_entries = NULL;
}
#endif
go_shutdown();
go_frankenphp_shutdown_main_thread();
return NULL;
}
int frankenphp_init(int num_threads) {
int frankenphp_new_main_thread(int num_threads) {
pthread_t thread;
if (pthread_create(&thread, NULL, &php_main, (void *)(intptr_t)num_threads) !=
0) {
go_shutdown();
return -1;
}
return pthread_detach(thread);
}
bool frankenphp_new_php_thread(uintptr_t thread_index) {
pthread_t thread;
if (pthread_create(&thread, NULL, &php_thread, (void *)thread_index) != 0) {
return false;
}
pthread_detach(thread);
return true;
}
int frankenphp_request_startup() {
if (php_request_startup() == SUCCESS) {
return SUCCESS;
@@ -960,8 +948,6 @@ int frankenphp_request_startup() {
int frankenphp_execute_script(char *file_name) {
if (frankenphp_request_startup() == FAILURE) {
free(file_name);
file_name = NULL;
return FAILURE;
}
@@ -970,8 +956,6 @@ int frankenphp_execute_script(char *file_name) {
zend_file_handle file_handle;
zend_stream_init_filename(&file_handle, file_name);
free(file_name);
file_name = NULL;
file_handle.primary_script = 1;

View File

@@ -64,8 +64,6 @@ var (
ScriptExecutionError = errors.New("error during PHP script execution")
requestChan chan *http.Request
done chan struct{}
shutdownWG sync.WaitGroup
loggerMu sync.RWMutex
logger *zap.Logger
@@ -123,7 +121,7 @@ type FrankenPHPContext struct {
closed sync.Once
responseWriter http.ResponseWriter
exitStatus C.int
exitStatus int
done chan interface{}
startedAt time.Time
@@ -244,7 +242,7 @@ func Config() PHPConfig {
// MaxThreads is internally used during tests. It is written to, but never read and may go away in the future.
var MaxThreads int
func calculateMaxThreads(opt *opt) error {
func calculateMaxThreads(opt *opt) (int, int, error) {
maxProcs := runtime.GOMAXPROCS(0) * 2
var numWorkers int
@@ -266,13 +264,13 @@ func calculateMaxThreads(opt *opt) error {
opt.numThreads = maxProcs
}
} else if opt.numThreads <= numWorkers {
return NotEnoughThreads
return opt.numThreads, numWorkers, NotEnoughThreads
}
metrics.TotalThreads(opt.numThreads)
MaxThreads = opt.numThreads
return nil
return opt.numThreads, numWorkers, nil
}
// Init starts the PHP runtime and the configured workers.
@@ -311,7 +309,7 @@ func Init(options ...Option) error {
metrics = opt.metrics
}
err := calculateMaxThreads(opt)
totalThreadCount, workerThreadCount, err := calculateMaxThreads(opt)
if err != nil {
return err
}
@@ -327,29 +325,26 @@ func Init(options ...Option) error {
logger.Warn(`Zend Max Execution Timers are not enabled, timeouts (e.g. "max_execution_time") are disabled, recompile PHP with the "--enable-zend-max-execution-timers" configuration option to fix this issue`)
}
} else {
opt.numThreads = 1
totalThreadCount = 1
logger.Warn(`ZTS is not enabled, only 1 thread will be available, recompile PHP using the "--enable-zts" configuration option or performance will be degraded`)
}
shutdownWG.Add(1)
done = make(chan struct{})
requestChan = make(chan *http.Request, opt.numThreads)
initPHPThreads(opt.numThreads)
if err := initPHPThreads(totalThreadCount); err != nil {
return err
}
if C.frankenphp_init(C.int(opt.numThreads)) != 0 {
return MainThreadCreationError
for i := 0; i < totalThreadCount-workerThreadCount; i++ {
thread := getInactivePHPThread()
convertToRegularThread(thread)
}
if err := initWorkers(opt.workers); err != nil {
return err
}
if err := restartWorkersOnFileChanges(opt.workers); err != nil {
return err
}
if c := logger.Check(zapcore.InfoLevel, "FrankenPHP started 🐘"); c != nil {
c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", opt.numThreads))
c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", totalThreadCount))
}
if EmbeddedAppPath != "" {
if c := logger.Check(zapcore.InfoLevel, "embedded PHP app 📦"); c != nil {
@@ -363,7 +358,7 @@ func Init(options ...Option) error {
// Shutdown stops the workers and the PHP runtime.
func Shutdown() {
drainWorkers()
drainThreads()
drainPHPThreads()
metrics.Shutdown()
requestChan = nil
@@ -375,17 +370,6 @@ func Shutdown() {
logger.Debug("FrankenPHP shut down")
}
//export go_shutdown
func go_shutdown() {
shutdownWG.Done()
}
func drainThreads() {
close(done)
shutdownWG.Wait()
phpThreads = nil
}
func getLogger() *zap.Logger {
loggerMu.RLock()
defer loggerMu.RUnlock()
@@ -466,9 +450,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
return nil
}
shutdownWG.Add(1)
defer shutdownWG.Done()
fc, ok := FromContext(request.Context())
if !ok {
return InvalidRequestError
@@ -477,76 +458,25 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
fc.responseWriter = responseWriter
fc.startedAt = time.Now()
isWorker := fc.responseWriter == nil
// Detect if a worker is available to handle this request
if !isWorker {
if worker, ok := workers[fc.scriptFilename]; ok {
metrics.StartWorkerRequest(fc.scriptFilename)
worker.handleRequest(request)
<-fc.done
metrics.StopWorkerRequest(fc.scriptFilename, time.Since(fc.startedAt))
return nil
} else {
metrics.StartRequest()
}
if worker, ok := workers[fc.scriptFilename]; ok {
worker.handleRequest(request, fc)
return nil
}
metrics.StartRequest()
select {
case <-done:
case <-mainThread.done:
case requestChan <- request:
<-fc.done
}
if !isWorker {
metrics.StopRequest()
}
metrics.StopRequest()
return nil
}
//export go_handle_request
func go_handle_request(threadIndex C.uintptr_t) bool {
select {
case <-done:
return false
case r := <-requestChan:
thread := phpThreads[threadIndex]
thread.mainRequest = r
fc, ok := FromContext(r.Context())
if !ok {
panic(InvalidRequestError)
}
defer func() {
maybeCloseContext(fc)
thread.mainRequest = nil
thread.Unpin()
}()
if err := updateServerContext(thread, r, true, false); err != nil {
rejectRequest(fc.responseWriter, err.Error())
return true
}
// scriptFilename is freed in frankenphp_execute_script()
fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename))
if fc.exitStatus < 0 {
panic(ScriptExecutionError)
}
// if the script has errored or timed out, make sure any pending worker requests are closed
if fc.exitStatus > 0 && thread.workerRequest != nil {
fc := thread.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
thread.workerRequest = nil
}
return true
}
}
func maybeCloseContext(fc *FrankenPHPContext) {
fc.closed.Do(func() {
close(fc.done)
@@ -598,7 +528,7 @@ func go_apache_request_headers(threadIndex C.uintptr_t, hasActiveRequest bool) (
if !hasActiveRequest {
// worker mode, not handling a request
mfc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
mfc := thread.getActiveRequest().Context().Value(contextKey).(*FrankenPHPContext)
if c := mfc.logger.Check(zapcore.DebugLevel, "apache_request_headers() called in non-HTTP context"); c != nil {
c.Write(zap.String("worker", mfc.scriptFilename))
@@ -784,21 +714,11 @@ func freeArgs(argv []*C.char) {
}
}
func executePHPFunction(functionName string) {
func executePHPFunction(functionName string) bool {
cFunctionName := C.CString(functionName)
defer C.free(unsafe.Pointer(cFunctionName))
success := C.frankenphp_execute_php_function(cFunctionName)
if success == 1 {
if c := logger.Check(zapcore.DebugLevel, "php function call successful"); c != nil {
c.Write(zap.String("function", functionName))
}
} else {
if c := logger.Check(zapcore.ErrorLevel, "php function call failed"); c != nil {
c.Write(zap.String("function", functionName))
}
}
return C.frankenphp_execute_php_function(cFunctionName) == 1
}
// Ensure that the request path does not contain null bytes

View File

@@ -40,7 +40,8 @@ typedef struct frankenphp_config {
} frankenphp_config;
frankenphp_config frankenphp_get_config();
int frankenphp_init(int num_threads);
int frankenphp_new_main_thread(int num_threads);
bool frankenphp_new_php_thread(uintptr_t thread_index);
int frankenphp_update_server_context(
bool create, bool has_main_request, bool has_active_request,

View File

@@ -36,22 +36,17 @@ ZEND_FUNCTION(frankenphp_finish_request);
ZEND_FUNCTION(frankenphp_request_headers);
ZEND_FUNCTION(frankenphp_response_headers);
// clang-format off
static const zend_function_entry ext_functions[] = {
ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request)
ZEND_FE(headers_send, arginfo_headers_send) ZEND_FE(
frankenphp_finish_request, arginfo_frankenphp_finish_request)
ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request,
arginfo_fastcgi_finish_request)
ZEND_FE(frankenphp_request_headers,
arginfo_frankenphp_request_headers)
ZEND_FALIAS(apache_request_headers,
frankenphp_request_headers,
arginfo_apache_request_headers)
ZEND_FALIAS(getallheaders, frankenphp_request_headers,
arginfo_getallheaders)
ZEND_FE(frankenphp_response_headers,
arginfo_frankenphp_response_headers)
ZEND_FALIAS(apache_response_headers,
frankenphp_response_headers,
arginfo_apache_response_headers)
ZEND_FE_END};
ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request)
ZEND_FE(headers_send, arginfo_headers_send)
ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request)
ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request)
ZEND_FE(frankenphp_request_headers, arginfo_frankenphp_request_headers)
ZEND_FALIAS(apache_request_headers, frankenphp_request_headers, arginfo_apache_request_headers)
ZEND_FALIAS(getallheaders, frankenphp_request_headers, arginfo_getallheaders)
ZEND_FE(frankenphp_response_headers, arginfo_frankenphp_response_headers)
ZEND_FALIAS(apache_response_headers, frankenphp_response_headers, arginfo_apache_response_headers)
ZEND_FE_END
};
// clang-format on

View File

@@ -592,6 +592,23 @@ func testFiberNoCgo(t *testing.T, opts *testOptions) {
}, opts)
}
func TestFiberBasic_module(t *testing.T) { testFiberBasic(t, &testOptions{}) }
func TestFiberBasic_worker(t *testing.T) {
testFiberBasic(t, &testOptions{workerScript: "fiber-basic.php"})
}
func testFiberBasic(t *testing.T, opts *testOptions) {
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) {
req := httptest.NewRequest("GET", fmt.Sprintf("http://example.com/fiber-basic.php?i=%d", i), nil)
w := httptest.NewRecorder()
handler(w, req)
resp := w.Result()
body, _ := io.ReadAll(resp.Body)
assert.Equal(t, string(body), fmt.Sprintf("Fiber %d", i))
}, opts)
}
func TestRequestHeaders_module(t *testing.T) { testRequestHeaders(t, &testOptions{}) }
func TestRequestHeaders_worker(t *testing.T) {
testRequestHeaders(t, &testOptions{workerScript: "request-headers.php"})

108
phpmainthread.go Normal file
View File

@@ -0,0 +1,108 @@
package frankenphp
// #include "frankenphp.h"
import "C"
import (
"sync"
"go.uber.org/zap"
)
// represents the main PHP thread
// the thread needs to keep running as long as all other threads are running
type phpMainThread struct {
state *threadState
done chan struct{}
numThreads int
}
var (
phpThreads []*phpThread
mainThread *phpMainThread
)
// reserve a fixed number of PHP threads on the Go side
func initPHPThreads(numThreads int) error {
mainThread = &phpMainThread{
state: newThreadState(),
done: make(chan struct{}),
numThreads: numThreads,
}
phpThreads = make([]*phpThread, numThreads)
if err := mainThread.start(); err != nil {
return err
}
// initialize all threads as inactive
for i := 0; i < numThreads; i++ {
phpThreads[i] = newPHPThread(i)
convertToInactiveThread(phpThreads[i])
}
// start the underlying C threads
ready := sync.WaitGroup{}
ready.Add(numThreads)
for _, thread := range phpThreads {
go func() {
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex))
}
thread.state.waitFor(stateInactive)
ready.Done()
}()
}
ready.Wait()
return nil
}
func drainPHPThreads() {
doneWG := sync.WaitGroup{}
doneWG.Add(len(phpThreads))
for _, thread := range phpThreads {
thread.handlerMu.Lock()
_ = thread.state.requestSafeStateChange(stateShuttingDown)
close(thread.drainChan)
}
close(mainThread.done)
for _, thread := range phpThreads {
go func(thread *phpThread) {
thread.state.waitFor(stateDone)
thread.handlerMu.Unlock()
doneWG.Done()
}(thread)
}
doneWG.Wait()
mainThread.state.set(stateShuttingDown)
mainThread.state.waitFor(stateDone)
phpThreads = nil
}
func (mainThread *phpMainThread) start() error {
if C.frankenphp_new_main_thread(C.int(mainThread.numThreads)) != 0 {
return MainThreadCreationError
}
mainThread.state.waitFor(stateReady)
return nil
}
func getInactivePHPThread() *phpThread {
for _, thread := range phpThreads {
if thread.state.is(stateInactive) {
return thread
}
}
panic("not enough threads reserved")
}
//export go_frankenphp_main_thread_is_ready
func go_frankenphp_main_thread_is_ready() {
mainThread.state.set(stateReady)
mainThread.state.waitFor(stateShuttingDown)
}
//export go_frankenphp_shutdown_main_thread
func go_frankenphp_shutdown_main_thread() {
mainThread.state.set(stateDone)
}

161
phpmainthread_test.go Normal file
View File

@@ -0,0 +1,161 @@
package frankenphp
import (
"io"
"math/rand/v2"
"net/http/httptest"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
var testDataPath, _ = filepath.Abs("./testdata")
func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {
logger = zap.NewNop() // the logger needs to not be nil
assert.NoError(t, initPHPThreads(1)) // reserve 1 thread
assert.Len(t, phpThreads, 1)
assert.Equal(t, 0, phpThreads[0].threadIndex)
assert.True(t, phpThreads[0].state.is(stateInactive))
drainPHPThreads()
assert.Nil(t, phpThreads)
}
func TestTransitionRegularThreadToWorkerThread(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1))
// transition to regular thread
convertToRegularThread(phpThreads[0])
assert.IsType(t, &regularThread{}, phpThreads[0].handler)
// transition to worker thread
worker := getDummyWorker("transition-worker-1.php")
convertToWorkerThread(phpThreads[0], worker)
assert.IsType(t, &workerThread{}, phpThreads[0].handler)
assert.Len(t, worker.threads, 1)
// transition back to inactive thread
convertToInactiveThread(phpThreads[0])
assert.IsType(t, &inactiveThread{}, phpThreads[0].handler)
assert.Len(t, worker.threads, 0)
drainPHPThreads()
assert.Nil(t, phpThreads)
}
func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1))
firstWorker := getDummyWorker("transition-worker-1.php")
secondWorker := getDummyWorker("transition-worker-2.php")
// convert to first worker thread
convertToWorkerThread(phpThreads[0], firstWorker)
firstHandler := phpThreads[0].handler.(*workerThread)
assert.Same(t, firstWorker, firstHandler.worker)
assert.Len(t, firstWorker.threads, 1)
assert.Len(t, secondWorker.threads, 0)
// convert to second worker thread
convertToWorkerThread(phpThreads[0], secondWorker)
secondHandler := phpThreads[0].handler.(*workerThread)
assert.Same(t, secondWorker, secondHandler.worker)
assert.Len(t, firstWorker.threads, 0)
assert.Len(t, secondWorker.threads, 1)
drainPHPThreads()
assert.Nil(t, phpThreads)
}
func TestTransitionThreadsWhileDoingRequests(t *testing.T) {
numThreads := 10
numRequestsPerThread := 100
isRunning := atomic.Bool{}
isRunning.Store(true)
wg := sync.WaitGroup{}
worker1Path := testDataPath + "/transition-worker-1.php"
worker2Path := testDataPath + "/transition-worker-2.php"
assert.NoError(t, Init(
WithNumThreads(numThreads),
WithWorkers(worker1Path, 1, map[string]string{"ENV1": "foo"}, []string{}),
WithWorkers(worker2Path, 1, map[string]string{"ENV1": "foo"}, []string{}),
WithLogger(zap.NewNop()),
))
// randomly transition threads between regular, inactive and 2 worker threads
go func() {
for {
for i := 0; i < numThreads; i++ {
switch rand.IntN(4) {
case 0:
convertToRegularThread(phpThreads[i])
case 1:
convertToWorkerThread(phpThreads[i], workers[worker1Path])
case 2:
convertToWorkerThread(phpThreads[i], workers[worker2Path])
case 3:
convertToInactiveThread(phpThreads[i])
}
time.Sleep(time.Millisecond)
if !isRunning.Load() {
return
}
}
}
}()
// randomly do requests to the 3 endpoints
wg.Add(numThreads)
for i := 0; i < numThreads; i++ {
go func(i int) {
for j := 0; j < numRequestsPerThread; j++ {
switch rand.IntN(3) {
case 0:
assertRequestBody(t, "http://localhost/transition-worker-1.php", "Hello from worker 1")
case 1:
assertRequestBody(t, "http://localhost/transition-worker-2.php", "Hello from worker 2")
case 2:
assertRequestBody(t, "http://localhost/transition-regular.php", "Hello from regular thread")
}
}
wg.Done()
}(i)
}
wg.Wait()
isRunning.Store(false)
Shutdown()
}
func getDummyWorker(fileName string) *worker {
if workers == nil {
workers = make(map[string]*worker)
}
worker, _ := newWorker(workerOpt{
fileName: testDataPath + "/" + fileName,
num: 1,
})
return worker
}
func assertRequestBody(t *testing.T, url string, expected string) {
r := httptest.NewRequest("GET", url, nil)
w := httptest.NewRecorder()
req, err := NewRequestWithContext(r, WithRequestDocumentRoot(testDataPath, false))
assert.NoError(t, err)
err = ServeHTTP(w, req)
assert.NoError(t, err)
resp := w.Result()
body, _ := io.ReadAll(resp.Body)
assert.Equal(t, expected, string(body))
}

View File

@@ -1,7 +1,6 @@
package frankenphp
// #include <stdint.h>
// #include <php_variables.h>
// #include "frankenphp.h"
import "C"
import (
"net/http"
@@ -10,32 +9,65 @@ import (
"unsafe"
)
var phpThreads []*phpThread
// representation of the actual underlying PHP thread
// identified by the index in the phpThreads slice
type phpThread struct {
runtime.Pinner
mainRequest *http.Request
workerRequest *http.Request
worker *worker
requestChan chan *http.Request
threadIndex int
knownVariableKeys map[string]*C.zend_string
readiedOnce sync.Once
requestChan chan *http.Request
drainChan chan struct{}
handlerMu *sync.Mutex
handler threadHandler
state *threadState
}
func initPHPThreads(numThreads int) {
phpThreads = make([]*phpThread, 0, numThreads)
for i := 0; i < numThreads; i++ {
phpThreads = append(phpThreads, &phpThread{})
// interface that defines how the callbacks from the C thread should be handled
type threadHandler interface {
beforeScriptExecution() string
afterScriptExecution(exitStatus int)
getActiveRequest() *http.Request
}
func newPHPThread(threadIndex int) *phpThread {
return &phpThread{
threadIndex: threadIndex,
drainChan: make(chan struct{}),
requestChan: make(chan *http.Request),
handlerMu: &sync.Mutex{},
state: newThreadState(),
}
}
// change the thread handler safely
// must be called from outside of the PHP thread
func (thread *phpThread) setHandler(handler threadHandler) {
logger.Debug("setHandler")
thread.handlerMu.Lock()
defer thread.handlerMu.Unlock()
if !thread.state.requestSafeStateChange(stateTransitionRequested) {
// no state change allowed == shutdown
return
}
close(thread.drainChan)
thread.state.waitFor(stateTransitionInProgress)
thread.handler = handler
thread.drainChan = make(chan struct{})
thread.state.set(stateTransitionComplete)
}
// transition to a new handler safely
// is triggered by setHandler and executed on the PHP thread
func (thread *phpThread) transitionToNewHandler() string {
thread.state.set(stateTransitionInProgress)
thread.state.waitFor(stateTransitionComplete)
// execute beforeScriptExecution of the new handler
return thread.handler.beforeScriptExecution()
}
func (thread *phpThread) getActiveRequest() *http.Request {
if thread.workerRequest != nil {
return thread.workerRequest
}
return thread.mainRequest
return thread.handler.getActiveRequest()
}
// Pin a string that is not null-terminated
@@ -50,3 +82,34 @@ func (thread *phpThread) pinString(s string) *C.char {
func (thread *phpThread) pinCString(s string) *C.char {
return thread.pinString(s + "\x00")
}
//export go_frankenphp_before_script_execution
func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char {
thread := phpThreads[threadIndex]
scriptName := thread.handler.beforeScriptExecution()
// if no scriptName is passed, shut down
if scriptName == "" {
return nil
}
// return the name of the PHP script that should be executed
return thread.pinCString(scriptName)
}
//export go_frankenphp_after_script_execution
func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C.int) {
thread := phpThreads[threadIndex]
if exitStatus < 0 {
panic(ScriptExecutionError)
}
thread.handler.afterScriptExecution(int(exitStatus))
// unpin all memory used during script execution
thread.Unpin()
}
//export go_frankenphp_on_thread_shutdown
func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) {
phpThreads[threadIndex].Unpin()
phpThreads[threadIndex].state.set(stateDone)
}

View File

@@ -1,40 +0,0 @@
package frankenphp
import (
"net/http"
"testing"
"github.com/stretchr/testify/assert"
)
func TestInitializeTwoPhpThreadsWithoutRequests(t *testing.T) {
initPHPThreads(2)
assert.Len(t, phpThreads, 2)
assert.NotNil(t, phpThreads[0])
assert.NotNil(t, phpThreads[1])
assert.Nil(t, phpThreads[0].mainRequest)
assert.Nil(t, phpThreads[0].workerRequest)
}
func TestMainRequestIsActiveRequest(t *testing.T) {
mainRequest := &http.Request{}
initPHPThreads(1)
thread := phpThreads[0]
thread.mainRequest = mainRequest
assert.Equal(t, mainRequest, thread.getActiveRequest())
}
func TestWorkerRequestIsActiveRequest(t *testing.T) {
mainRequest := &http.Request{}
workerRequest := &http.Request{}
initPHPThreads(1)
thread := phpThreads[0]
thread.mainRequest = mainRequest
thread.workerRequest = workerRequest
assert.Equal(t, workerRequest, thread.getActiveRequest())
}

142
state.go Normal file
View File

@@ -0,0 +1,142 @@
package frankenphp
import (
"slices"
"strconv"
"sync"
)
type stateID uint8
const (
// lifecycle states of a thread
stateBooting stateID = iota
stateShuttingDown
stateDone
// these states are safe to transition from at any time
stateInactive
stateReady
// states necessary for restarting workers
stateRestarting
stateYielding
// states necessary for transitioning between different handlers
stateTransitionRequested
stateTransitionInProgress
stateTransitionComplete
)
type threadState struct {
currentState stateID
mu sync.RWMutex
subscribers []stateSubscriber
}
type stateSubscriber struct {
states []stateID
ch chan struct{}
}
func newThreadState() *threadState {
return &threadState{
currentState: stateBooting,
subscribers: []stateSubscriber{},
mu: sync.RWMutex{},
}
}
func (ts *threadState) is(state stateID) bool {
ts.mu.RLock()
ok := ts.currentState == state
ts.mu.RUnlock()
return ok
}
func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool {
ts.mu.Lock()
ok := ts.currentState == compareTo
if ok {
ts.currentState = swapTo
ts.notifySubscribers(swapTo)
}
ts.mu.Unlock()
return ok
}
func (ts *threadState) name() string {
// TODO: return the actual name for logging/metrics
return "state:" + strconv.Itoa(int(ts.get()))
}
func (ts *threadState) get() stateID {
ts.mu.RLock()
id := ts.currentState
ts.mu.RUnlock()
return id
}
func (ts *threadState) set(nextState stateID) {
ts.mu.Lock()
ts.currentState = nextState
ts.notifySubscribers(nextState)
ts.mu.Unlock()
}
func (ts *threadState) notifySubscribers(nextState stateID) {
if len(ts.subscribers) == 0 {
return
}
newSubscribers := []stateSubscriber{}
// notify subscribers to the state change
for _, sub := range ts.subscribers {
if !slices.Contains(sub.states, nextState) {
newSubscribers = append(newSubscribers, sub)
continue
}
close(sub.ch)
}
ts.subscribers = newSubscribers
}
// block until the thread reaches a certain state
func (ts *threadState) waitFor(states ...stateID) {
ts.mu.Lock()
if slices.Contains(states, ts.currentState) {
ts.mu.Unlock()
return
}
sub := stateSubscriber{
states: states,
ch: make(chan struct{}),
}
ts.subscribers = append(ts.subscribers, sub)
ts.mu.Unlock()
<-sub.ch
}
// safely request a state change from a different goroutine
func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
ts.mu.Lock()
switch ts.currentState {
// disallow state changes if shutting down
case stateShuttingDown, stateDone:
ts.mu.Unlock()
return false
// ready and inactive are safe states to transition from
case stateReady, stateInactive:
ts.currentState = nextState
ts.notifySubscribers(nextState)
ts.mu.Unlock()
return true
}
ts.mu.Unlock()
// wait for the state to change to a safe state
ts.waitFor(stateReady, stateInactive, stateShuttingDown)
return ts.requestSafeStateChange(nextState)
}

56
state_test.go Normal file
View File

@@ -0,0 +1,56 @@
package frankenphp
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test2GoroutinesYieldToEachOtherViaStates(t *testing.T) {
threadState := &threadState{currentState: stateBooting}
go func() {
threadState.waitFor(stateInactive)
assert.True(t, threadState.is(stateInactive))
threadState.set(stateReady)
}()
threadState.set(stateInactive)
threadState.waitFor(stateReady)
assert.True(t, threadState.is(stateReady))
}
func TestStateShouldHaveCorrectAmountOfSubscribers(t *testing.T) {
threadState := &threadState{currentState: stateBooting}
// 3 subscribers waiting for different states
go threadState.waitFor(stateInactive)
go threadState.waitFor(stateInactive, stateShuttingDown)
go threadState.waitFor(stateShuttingDown)
assertNumberOfSubscribers(t, threadState, 3)
threadState.set(stateInactive)
assertNumberOfSubscribers(t, threadState, 1)
assert.True(t, threadState.compareAndSwap(stateInactive, stateShuttingDown))
assertNumberOfSubscribers(t, threadState, 0)
}
func assertNumberOfSubscribers(t *testing.T, threadState *threadState, expected int) {
maxWaits := 10_000 // wait for 1 second max
for i := 0; i < maxWaits; i++ {
time.Sleep(100 * time.Microsecond)
threadState.mu.RLock()
if len(threadState.subscribers) == expected {
threadState.mu.RUnlock()
break
}
threadState.mu.RUnlock()
}
threadState.mu.RLock()
assert.Len(t, threadState.subscribers, expected)
threadState.mu.RUnlock()
}

9
testdata/fiber-basic.php vendored Normal file
View File

@@ -0,0 +1,9 @@
<?php
require_once __DIR__.'/_executor.php';
return function() {
$fiber = new Fiber(function() {
echo 'Fiber '.($_GET['i'] ?? '');
});
$fiber->start();
};

3
testdata/transition-regular.php vendored Normal file
View File

@@ -0,0 +1,3 @@
<?php
echo "Hello from regular thread";

7
testdata/transition-worker-1.php vendored Normal file
View File

@@ -0,0 +1,7 @@
<?php
while (frankenphp_handle_request(function () {
echo "Hello from worker 1";
})) {
}

9
testdata/transition-worker-2.php vendored Normal file
View File

@@ -0,0 +1,9 @@
<?php
while (frankenphp_handle_request(function () {
echo "Hello from worker 2";
// Simulate work to force potential race conditions (phpmainthread_test.go)
usleep(1000);
})) {
}

45
thread-inactive.go Normal file
View File

@@ -0,0 +1,45 @@
package frankenphp
import (
"net/http"
)
// representation of a thread with no work assigned to it
// implements the threadHandler interface
type inactiveThread struct {
thread *phpThread
}
func convertToInactiveThread(thread *phpThread) {
if thread.handler == nil {
thread.handler = &inactiveThread{thread: thread}
return
}
thread.setHandler(&inactiveThread{thread: thread})
}
func (handler *inactiveThread) beforeScriptExecution() string {
thread := handler.thread
switch thread.state.get() {
case stateTransitionRequested:
return thread.transitionToNewHandler()
case stateBooting, stateTransitionComplete:
thread.state.set(stateInactive)
// wait for external signal to start or shut down
thread.state.waitFor(stateTransitionRequested, stateShuttingDown)
return handler.beforeScriptExecution()
case stateShuttingDown:
// signal to stop
return ""
}
panic("unexpected state: " + thread.state.name())
}
func (thread *inactiveThread) afterScriptExecution(exitStatus int) {
panic("inactive threads should not execute scripts")
}
func (thread *inactiveThread) getActiveRequest() *http.Request {
panic("inactive threads have no requests")
}

78
thread-regular.go Normal file
View File

@@ -0,0 +1,78 @@
package frankenphp
// #include "frankenphp.h"
import "C"
import (
"net/http"
)
// representation of a non-worker PHP thread
// executes PHP scripts in a web context
// implements the threadHandler interface
type regularThread struct {
state *threadState
thread *phpThread
activeRequest *http.Request
}
func convertToRegularThread(thread *phpThread) {
thread.setHandler(&regularThread{
thread: thread,
state: thread.state,
})
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *regularThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
return handler.thread.transitionToNewHandler()
case stateTransitionComplete:
handler.state.set(stateReady)
return handler.waitForRequest()
case stateReady:
return handler.waitForRequest()
case stateShuttingDown:
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
// return true if the worker should continue to run
func (handler *regularThread) afterScriptExecution(exitStatus int) {
handler.afterRequest(exitStatus)
}
func (handler *regularThread) getActiveRequest() *http.Request {
return handler.activeRequest
}
func (handler *regularThread) waitForRequest() string {
select {
case <-handler.thread.drainChan:
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case r := <-requestChan:
handler.activeRequest = r
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if err := updateServerContext(handler.thread, r, true, false); err != nil {
rejectRequest(fc.responseWriter, err.Error())
handler.afterRequest(0)
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
}
// set the scriptFilename that should be executed
return fc.scriptFilename
}
}
func (handler *regularThread) afterRequest(exitStatus int) {
fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
maybeCloseContext(fc)
handler.activeRequest = nil
}

222
thread-worker.go Normal file
View File

@@ -0,0 +1,222 @@
package frankenphp
// #include "frankenphp.h"
import "C"
import (
"net/http"
"path/filepath"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// representation of a thread assigned to a worker script
// executes the PHP worker script in a loop
// implements the threadHandler interface
type workerThread struct {
state *threadState
thread *phpThread
worker *worker
fakeRequest *http.Request
workerRequest *http.Request
backoff *exponentialBackoff
}
func convertToWorkerThread(thread *phpThread, worker *worker) {
thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
worker: worker,
backoff: &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: 6,
},
})
worker.attachThread(thread)
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *workerThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
handler.worker.detachThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateRestarting:
handler.state.set(stateYielding)
handler.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateReady, stateTransitionComplete:
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
func (handler *workerThread) afterScriptExecution(exitStatus int) {
tearDownWorkerScript(handler, exitStatus)
}
func (handler *workerThread) getActiveRequest() *http.Request {
if handler.workerRequest != nil {
return handler.workerRequest
}
return handler.fakeRequest
}
func setupWorkerScript(handler *workerThread, worker *worker) {
handler.backoff.wait()
metrics.StartWorker(worker.fileName)
// Create a dummy request to set up the worker
r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
if err != nil {
panic(err)
}
r, err = NewRequestWithContext(
r,
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
)
if err != nil {
panic(err)
}
if err := updateServerContext(handler.thread, r, true, false); err != nil {
panic(err)
}
handler.fakeRequest = r
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
}
}
func tearDownWorkerScript(handler *workerThread, exitStatus int) {
// if the worker request is not nil, the script might have crashed
// make sure to close the worker request context
if handler.workerRequest != nil {
fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
handler.workerRequest = nil
}
fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
defer func() {
handler.fakeRequest = nil
}()
// on exit status 0 we just run the worker script again
worker := handler.worker
if fc.exitStatus == 0 {
// TODO: make the max restart configurable
metrics.StopWorker(worker.fileName, StopReasonRestart)
handler.backoff.recordSuccess()
if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
return
}
// TODO: error status
// on exit status 1 we apply an exponential backoff when restarting
metrics.StopWorker(worker.fileName, StopReasonCrash)
if handler.backoff.recordFailure() {
if !watcherIsEnabled {
logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
}
}
func (handler *workerThread) waitForWorkerRequest() bool {
// unpin any memory left over from previous requests
handler.thread.Unpin()
if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName))
}
if handler.state.compareAndSwap(stateTransitionComplete, stateReady) {
metrics.ReadyWorker(handler.worker.fileName)
}
var r *http.Request
select {
case <-handler.thread.drainChan:
if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName))
}
// execute opcache_reset if the restart was triggered by the watcher
if watcherIsEnabled && handler.state.is(stateRestarting) {
C.frankenphp_reset_opcache()
}
return false
case r = <-handler.thread.requestChan:
case r = <-handler.worker.requestChan:
}
handler.workerRequest = r
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
}
if err := updateServerContext(handler.thread, r, false, true); err != nil {
// Unexpected error or invalid request
if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
rejectRequest(fc.responseWriter, err.Error())
maybeCloseContext(fc)
handler.workerRequest = nil
return handler.waitForWorkerRequest()
}
return true
}
//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
handler := phpThreads[threadIndex].handler.(*workerThread)
return C.bool(handler.waitForWorkerRequest())
}
//export go_frankenphp_finish_worker_request
func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
thread := phpThreads[threadIndex]
r := thread.getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
thread.handler.(*workerThread).workerRequest = nil
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
}
}
// when frankenphp_finish_request() is directly called from PHP
//
//export go_frankenphp_finish_php_request
func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
r := phpThreads[threadIndex].getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
c.Write(zap.String("url", r.RequestURI))
}
}

321
worker.go
View File

@@ -1,21 +1,18 @@
package frankenphp
// #include <stdlib.h>
// #include "frankenphp.h"
import "C"
import (
"fmt"
"github.com/dunglas/frankenphp/internal/fastabs"
"net/http"
"path/filepath"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/watcher"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// represents a worker script and can have many threads assigned to it
type worker struct {
fileName string
num int
@@ -23,40 +20,45 @@ type worker struct {
requestChan chan *http.Request
threads []*phpThread
threadMutex sync.RWMutex
ready chan struct{}
}
var (
workers map[string]*worker
watcherIsEnabled bool
workerShutdownWG sync.WaitGroup
workersDone chan interface{}
workers = make(map[string]*worker)
)
func initWorkers(opt []workerOpt) error {
workersDone = make(chan interface{})
ready := sync.WaitGroup{}
workers = make(map[string]*worker, len(opt))
workersReady := sync.WaitGroup{}
directoriesToWatch := getDirectoriesToWatch(opt)
watcherIsEnabled = len(directoriesToWatch) > 0
for _, o := range opt {
worker, err := newWorker(o)
worker.threads = make([]*phpThread, 0, o.num)
workersReady.Add(o.num)
if err != nil {
return err
}
for i := 0; i < worker.num; i++ {
go worker.startNewWorkerThread()
thread := getInactivePHPThread()
convertToWorkerThread(thread, worker)
go func() {
thread.state.waitFor(stateReady)
workersReady.Done()
}()
}
ready.Add(1)
go func() {
for i := 0; i < worker.num; i++ {
<-worker.ready
}
ready.Done()
}()
}
ready.Wait()
workersReady.Wait()
if !watcherIsEnabled {
return nil
}
if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
return err
}
return nil
}
@@ -67,12 +69,6 @@ func newWorker(o workerOpt) (*worker, error) {
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
}
// if the worker already exists, return it,
// it's necessary since we don't want to destroy the channels when restarting on file changes
if w, ok := workers[absFileName]; ok {
return w, nil
}
if o.env == nil {
o.env = make(PreparedEnv, 1)
}
@@ -83,247 +79,90 @@ func newWorker(o workerOpt) (*worker, error) {
num: o.num,
env: o.env,
requestChan: make(chan *http.Request),
ready: make(chan struct{}, o.num),
}
workers[absFileName] = w
return w, nil
}
func (worker *worker) startNewWorkerThread() {
workerShutdownWG.Add(1)
defer workerShutdownWG.Done()
backoff := &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: 6,
}
for {
// if the worker can stay up longer than backoff*2, it is probably an application error
backoff.wait()
metrics.StartWorker(worker.fileName)
// Create main dummy request
r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
if err != nil {
panic(err)
}
r, err = NewRequestWithContext(
r,
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
)
if err != nil {
panic(err)
}
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
}
if err := ServeHTTP(nil, r); err != nil {
panic(err)
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
// if we are done, exit the loop that restarts the worker script
select {
case _, ok := <-workersDone:
if !ok {
metrics.StopWorker(worker.fileName, StopReasonShutdown)
if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
return
}
// continue on since the channel is still open
default:
// continue on since the channel is still open
}
// on exit status 0 we just run the worker script again
if fc.exitStatus == 0 {
// TODO: make the max restart configurable
if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
metrics.StopWorker(worker.fileName, StopReasonRestart)
backoff.recordSuccess()
continue
}
// on exit status 1 we log the error and apply an exponential backoff when restarting
if backoff.recordFailure() {
if !watcherIsEnabled {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount))
}
metrics.StopWorker(worker.fileName, StopReasonCrash)
}
// unreachable
}
func (worker *worker) handleRequest(r *http.Request) {
worker.threadMutex.RLock()
// dispatch requests to all worker threads in order
for _, thread := range worker.threads {
select {
case thread.requestChan <- r:
worker.threadMutex.RUnlock()
return
default:
}
}
worker.threadMutex.RUnlock()
// if no thread was available, fan the request out to all threads
// TODO: theoretically there could be autoscaling of threads here
worker.requestChan <- r
}
func stopWorkers() {
close(workersDone)
}
func drainWorkers() {
watcher.DrainWatcher()
watcherIsEnabled = false
stopWorkers()
workerShutdownWG.Wait()
workers = make(map[string]*worker)
}
func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
var directoriesToWatch []string
func restartWorkers() {
ready := sync.WaitGroup{}
threadsToRestart := make([]*phpThread, 0)
for _, worker := range workers {
worker.threadMutex.RLock()
ready.Add(len(worker.threads))
for _, thread := range worker.threads {
if !thread.state.requestSafeStateChange(stateRestarting) {
// no state change allowed = shutdown
continue
}
close(thread.drainChan)
threadsToRestart = append(threadsToRestart, thread)
go func(thread *phpThread) {
thread.state.waitFor(stateYielding)
ready.Done()
}(thread)
}
worker.threadMutex.RUnlock()
}
ready.Wait()
for _, thread := range threadsToRestart {
thread.drainChan = make(chan struct{})
thread.state.set(stateReady)
}
}
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
directoriesToWatch := []string{}
for _, w := range workerOpts {
directoriesToWatch = append(directoriesToWatch, w.watch...)
}
watcherIsEnabled = len(directoriesToWatch) > 0
if !watcherIsEnabled {
return nil
}
restartWorkers := func() {
restartWorkers(workerOpts)
}
if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
return err
}
return nil
return directoriesToWatch
}
func restartWorkers(workerOpts []workerOpt) {
stopWorkers()
workerShutdownWG.Wait()
if err := initWorkers(workerOpts); err != nil {
logger.Error("failed to restart workers when watching files")
panic(err)
}
logger.Info("workers restarted successfully")
}
func assignThreadToWorker(thread *phpThread) {
fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
worker, ok := workers[fc.scriptFilename]
if !ok {
panic("worker not found for script: " + fc.scriptFilename)
}
thread.worker = worker
thread.requestChan = make(chan *http.Request)
func (worker *worker) attachThread(thread *phpThread) {
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
worker.threadMutex.Unlock()
}
//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
thread := phpThreads[threadIndex]
// we assign a worker to the thread if it doesn't have one already
if thread.worker == nil {
assignThreadToWorker(thread)
}
thread.readiedOnce.Do(func() {
// inform metrics that the worker is ready
metrics.ReadyWorker(thread.worker.fileName)
})
select {
case thread.worker.ready <- struct{}{}:
default:
}
if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
}
var r *http.Request
select {
case <-workersDone:
if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
func (worker *worker) detachThread(thread *phpThread) {
worker.threadMutex.Lock()
for i, t := range worker.threads {
if t == thread {
worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
break
}
thread.worker = nil
C.frankenphp_reset_opcache()
return C.bool(false)
case r = <-thread.worker.requestChan:
case r = <-thread.requestChan:
}
thread.workerRequest = r
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
}
if err := updateServerContext(thread, r, false, true); err != nil {
// Unexpected error
if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
rejectRequest(fc.responseWriter, err.Error())
maybeCloseContext(fc)
thread.workerRequest = nil
thread.Unpin()
return go_frankenphp_worker_handle_request_start(threadIndex)
}
return C.bool(true)
worker.threadMutex.Unlock()
}
//export go_frankenphp_finish_request
func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
thread := phpThreads[threadIndex]
r := thread.getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
metrics.StartWorkerRequest(fc.scriptFilename)
if isWorkerRequest {
thread.workerRequest = nil
}
maybeCloseContext(fc)
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
var fields []zap.Field
if isWorkerRequest {
fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
} else {
fields = append(fields, zap.String("url", r.RequestURI))
// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
for _, thread := range worker.threads {
select {
case thread.requestChan <- r:
worker.threadMutex.RUnlock()
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
return
default:
}
c.Write(fields...)
}
worker.threadMutex.RUnlock()
if isWorkerRequest {
thread.Unpin()
}
// if no thread was available, fan the request out to all threads
// TODO: theoretically there could be autoscaling of threads here
worker.requestChan <- r
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
}