diff --git a/backoff.go b/backoff.go deleted file mode 100644 index a4bce80f..00000000 --- a/backoff.go +++ /dev/null @@ -1,51 +0,0 @@ -package frankenphp - -import ( - "sync" - "time" -) - -type exponentialBackoff struct { - backoff time.Duration - failureCount int - mu sync.RWMutex - maxBackoff time.Duration - minBackoff time.Duration - maxConsecutiveFailures int -} - -// recordSuccess resets the backoff and failureCount -func (e *exponentialBackoff) recordSuccess() { - e.mu.Lock() - e.failureCount = 0 - e.backoff = e.minBackoff - e.mu.Unlock() -} - -// recordFailure increments the failure count and increases the backoff, it returns true if maxConsecutiveFailures has been reached -func (e *exponentialBackoff) recordFailure() bool { - e.mu.Lock() - e.failureCount += 1 - if e.backoff < e.minBackoff { - e.backoff = e.minBackoff - } - - e.backoff = min(e.backoff*2, e.maxBackoff) - - e.mu.Unlock() - return e.maxConsecutiveFailures != -1 && e.failureCount >= e.maxConsecutiveFailures -} - -// wait sleeps for the backoff duration if failureCount is non-zero. -// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple) -func (e *exponentialBackoff) wait() { - e.mu.RLock() - if e.failureCount == 0 { - e.mu.RUnlock() - - return - } - e.mu.RUnlock() - - time.Sleep(e.backoff) -} diff --git a/backoff_test.go b/backoff_test.go deleted file mode 100644 index 5ced2e4c..00000000 --- a/backoff_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package frankenphp - -import ( - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestExponentialBackoff_Reset(t *testing.T) { - e := &exponentialBackoff{ - maxBackoff: 5 * time.Second, - minBackoff: 500 * time.Millisecond, - maxConsecutiveFailures: 3, - } - - assert.False(t, e.recordFailure()) - assert.False(t, e.recordFailure()) - e.recordSuccess() - - e.mu.RLock() - defer e.mu.RUnlock() - assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0") - assert.Equal(t, e.backoff, e.minBackoff, "expected backoff to be reset to minBackoff") -} - -func TestExponentialBackoff_Trigger(t *testing.T) { - e := &exponentialBackoff{ - maxBackoff: 500 * 3 * time.Millisecond, - minBackoff: 500 * time.Millisecond, - maxConsecutiveFailures: 3, - } - - assert.False(t, e.recordFailure()) - assert.False(t, e.recordFailure()) - assert.True(t, e.recordFailure()) - - e.mu.RLock() - defer e.mu.RUnlock() - assert.Equal(t, e.failureCount, e.maxConsecutiveFailures, "expected failureCount to be maxConsecutiveFailures") - assert.Equal(t, e.backoff, e.maxBackoff, "expected backoff to be maxBackoff") -} diff --git a/cgi.go b/cgi.go index 09c60f48..63fb1339 100644 --- a/cgi.go +++ b/cgi.go @@ -277,13 +277,13 @@ func splitPos(path string, splitPath []string) int { // See: https://github.com/php/php-src/blob/345e04b619c3bc11ea17ee02cdecad6ae8ce5891/main/SAPI.h#L72 // //export go_update_request_info -func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) C.bool { +func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) { thread := phpThreads[threadIndex] fc := thread.frankenPHPContext() request := fc.request if request == nil { - return C.bool(fc.worker != nil) + return } authUser, authPassword, ok := request.BasicAuth() @@ -311,8 +311,6 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) info.request_uri = thread.pinCString(request.URL.RequestURI()) info.proto_num = C.int(request.ProtoMajor*1000 + request.ProtoMinor) - - return C.bool(fc.worker != nil) } // SanitizedPathJoin performs filepath.Join(root, reqPath) that diff --git a/debugstate.go b/debugstate.go index a7941ac7..c18813ec 100644 --- a/debugstate.go +++ b/debugstate.go @@ -1,5 +1,9 @@ package frankenphp +import ( + "github.com/dunglas/frankenphp/internal/state" +) + // EXPERIMENTAL: ThreadDebugState prints the state of a single PHP thread - debugging purposes only type ThreadDebugState struct { Index int @@ -23,7 +27,7 @@ func DebugState() FrankenPHPDebugState { ReservedThreadCount: 0, } for _, thread := range phpThreads { - if thread.state.is(stateReserved) { + if thread.state.Is(state.Reserved) { fullState.ReservedThreadCount++ continue } @@ -38,9 +42,9 @@ func threadDebugState(thread *phpThread) ThreadDebugState { return ThreadDebugState{ Index: thread.threadIndex, Name: thread.name(), - State: thread.state.name(), - IsWaiting: thread.state.isInWaitingState(), - IsBusy: !thread.state.isInWaitingState(), - WaitingSinceMilliseconds: thread.state.waitTime(), + State: thread.state.Name(), + IsWaiting: thread.state.IsInWaitingState(), + IsBusy: !thread.state.IsInWaitingState(), + WaitingSinceMilliseconds: thread.state.WaitTime(), } } diff --git a/env.go b/env.go index 9e6fbfdf..3ac9a3ad 100644 --- a/env.go +++ b/env.go @@ -1,10 +1,9 @@ package frankenphp // #cgo nocallback frankenphp_init_persistent_string -// #cgo nocallback frankenphp_add_assoc_str_ex // #cgo noescape frankenphp_init_persistent_string -// #cgo noescape frankenphp_add_assoc_str_ex // #include "frankenphp.h" +// #include import "C" import ( "os" @@ -98,7 +97,7 @@ func go_getfullenv(threadIndex C.uintptr_t, trackVarsArray *C.zval) { env := getSandboxedEnv(thread) for key, val := range env { - C.frankenphp_add_assoc_str_ex(trackVarsArray, toUnsafeChar(key), C.size_t(len(key)), val) + C.add_assoc_str_ex(trackVarsArray, toUnsafeChar(key), C.size_t(len(key)), val) } } diff --git a/frankenphp.c b/frankenphp.c index 2ad6e30c..04782a9b 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -51,7 +51,6 @@ frankenphp_version frankenphp_get_version() { frankenphp_config frankenphp_get_config() { return (frankenphp_config){ - frankenphp_get_version(), #ifdef ZTS true, #else @@ -75,6 +74,10 @@ __thread uintptr_t thread_index; __thread bool is_worker_thread = false; __thread zval *os_environment = NULL; +void frankenphp_update_local_thread_context(bool is_worker) { + is_worker_thread = is_worker; +} + static void frankenphp_update_request_context() { /* the server context is stored on the go side, still SG(server_context) needs * to not be NULL */ @@ -82,7 +85,7 @@ static void frankenphp_update_request_context() { /* status It is not reset by zend engine, set it to 200. */ SG(sapi_headers).http_response_code = 200; - is_worker_thread = go_update_request_info(thread_index, &SG(request_info)); + go_update_request_info(thread_index, &SG(request_info)); } static void frankenphp_free_request_context() { @@ -206,11 +209,6 @@ PHPAPI void get_full_env(zval *track_vars_array) { go_getfullenv(thread_index, track_vars_array); } -void frankenphp_add_assoc_str_ex(zval *track_vars_array, char *key, - size_t keylen, zend_string *val) { - add_assoc_str_ex(track_vars_array, key, keylen, val); -} - /* Adapted from php_request_startup() */ static int frankenphp_worker_request_startup() { int retval = SUCCESS; @@ -652,8 +650,9 @@ static char *frankenphp_read_cookies(void) { } /* all variables with well defined keys can safely be registered like this */ -void frankenphp_register_trusted_var(zend_string *z_key, char *value, - size_t val_len, HashTable *ht) { +static inline void frankenphp_register_trusted_var(zend_string *z_key, + char *value, size_t val_len, + HashTable *ht) { if (value == NULL) { zval empty; ZVAL_EMPTY_STRING(&empty); diff --git a/frankenphp.h b/frankenphp.h index c17df606..efbd5fc4 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -23,12 +23,6 @@ typedef struct ht_key_value_pair { size_t val_len; } ht_key_value_pair; -typedef struct php_variable { - const char *var; - size_t data_len; - char *data; -} php_variable; - typedef struct frankenphp_version { unsigned char major_version; unsigned char minor_version; @@ -40,7 +34,6 @@ typedef struct frankenphp_version { frankenphp_version frankenphp_get_version(); typedef struct frankenphp_config { - frankenphp_version version; bool zts; bool zend_signals; bool zend_max_execution_timers; @@ -52,6 +45,7 @@ bool frankenphp_new_php_thread(uintptr_t thread_index); bool frankenphp_shutdown_dummy_request(void); int frankenphp_execute_script(char *file_name); +void frankenphp_update_local_thread_context(bool is_worker); int frankenphp_execute_script_cli(char *script, int argc, char **argv, bool eval); @@ -65,8 +59,6 @@ void frankenphp_register_variable_safe(char *key, char *var, size_t val_len, zend_string *frankenphp_init_persistent_string(const char *string, size_t len); int frankenphp_reset_opcache(void); int frankenphp_get_current_memory_limit(); -void frankenphp_add_assoc_str_ex(zval *track_vars_array, char *key, - size_t keylen, zend_string *val); void frankenphp_register_single(zend_string *z_key, char *value, size_t val_len, zval *track_vars_array); diff --git a/frankenphp_test.go b/frankenphp_test.go index f2273c65..427b731f 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -618,10 +618,12 @@ func testRequestHeaders(t *testing.T, opts *testOptions) { } func TestFailingWorker(t *testing.T) { - runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) { - body, _ := testGet("http://example.com/failing-worker.php", handler, t) - assert.Contains(t, body, "ok") - }, &testOptions{workerScript: "failing-worker.php"}) + err := frankenphp.Init( + frankenphp.WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))), + frankenphp.WithWorkers("failing worker", "testdata/failing-worker.php", 4, frankenphp.WithWorkerMaxFailures(1)), + frankenphp.WithNumThreads(5), + ) + assert.Error(t, err, "should return an immediate error if workers fail on startup") } func TestEnv(t *testing.T) { diff --git a/internal/phpheaders/phpheaders.go b/internal/phpheaders/phpheaders.go index 19f1908d..71f23e7d 100644 --- a/internal/phpheaders/phpheaders.go +++ b/internal/phpheaders/phpheaders.go @@ -1,5 +1,6 @@ package phpheaders +import "C" import ( "context" "strings" diff --git a/internal/phpheaders/phpheaders_test.go b/internal/phpheaders/phpheaders_test.go new file mode 100644 index 00000000..5382c1ed --- /dev/null +++ b/internal/phpheaders/phpheaders_test.go @@ -0,0 +1,22 @@ +package phpheaders + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAllCommonHeadersAreCorrect(t *testing.T) { + fakeRequest := httptest.NewRequest("GET", "http://localhost", nil) + + for header, phpHeader := range CommonRequestHeaders { + // verify that common and uncommon headers return the same result + expectedPHPHeader := GetUnCommonHeader(t.Context(), header) + assert.Equal(t, phpHeader+"\x00", expectedPHPHeader, "header is not well formed: "+phpHeader) + + // net/http will capitalize lowercase headers, verify that headers are capitalized + fakeRequest.Header.Add(header, "foo") + assert.Contains(t, fakeRequest.Header, header, "header is not correctly capitalized: "+header) + } +} diff --git a/state.go b/internal/state/state.go similarity index 55% rename from state.go rename to internal/state/state.go index 71970c83..52c2d0b1 100644 --- a/state.go +++ b/internal/state/state.go @@ -1,51 +1,38 @@ -package frankenphp +package state +import "C" import ( "slices" "sync" "time" ) -type stateID uint8 +type State string const ( - // livecycle states of a thread - stateReserved stateID = iota - stateBooting - stateBootRequested - stateShuttingDown - stateDone + // livecycle States of a thread + Reserved State = "reserved" + Booting State = "booting" + BootRequested State = "boot requested" + ShuttingDown State = "shutting down" + Done State = "done" - // these states are 'stable' and safe to transition from at any time - stateInactive - stateReady + // these States are 'stable' and safe to transition from at any time + Inactive State = "inactive" + Ready State = "ready" - // states necessary for restarting workers - stateRestarting - stateYielding + // States necessary for restarting workers + Restarting State = "restarting" + Yielding State = "yielding" - // states necessary for transitioning between different handlers - stateTransitionRequested - stateTransitionInProgress - stateTransitionComplete + // States necessary for transitioning between different handlers + TransitionRequested State = "transition requested" + TransitionInProgress State = "transition in progress" + TransitionComplete State = "transition complete" ) -var stateNames = map[stateID]string{ - stateReserved: "reserved", - stateBooting: "booting", - stateInactive: "inactive", - stateReady: "ready", - stateShuttingDown: "shutting down", - stateDone: "done", - stateRestarting: "restarting", - stateYielding: "yielding", - stateTransitionRequested: "transition requested", - stateTransitionInProgress: "transition in progress", - stateTransitionComplete: "transition complete", -} - -type threadState struct { - currentState stateID +type ThreadState struct { + currentState State mu sync.RWMutex subscribers []stateSubscriber // how long threads have been waiting in stable states @@ -54,19 +41,19 @@ type threadState struct { } type stateSubscriber struct { - states []stateID + states []State ch chan struct{} } -func newThreadState() *threadState { - return &threadState{ - currentState: stateReserved, +func NewThreadState() *ThreadState { + return &ThreadState{ + currentState: Reserved, subscribers: []stateSubscriber{}, mu: sync.RWMutex{}, } } -func (ts *threadState) is(state stateID) bool { +func (ts *ThreadState) Is(state State) bool { ts.mu.RLock() ok := ts.currentState == state ts.mu.RUnlock() @@ -74,7 +61,7 @@ func (ts *threadState) is(state stateID) bool { return ok } -func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool { +func (ts *ThreadState) CompareAndSwap(compareTo State, swapTo State) bool { ts.mu.Lock() ok := ts.currentState == compareTo if ok { @@ -86,11 +73,11 @@ func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool { return ok } -func (ts *threadState) name() string { - return stateNames[ts.get()] +func (ts *ThreadState) Name() string { + return string(ts.Get()) } -func (ts *threadState) get() stateID { +func (ts *ThreadState) Get() State { ts.mu.RLock() id := ts.currentState ts.mu.RUnlock() @@ -98,14 +85,14 @@ func (ts *threadState) get() stateID { return id } -func (ts *threadState) set(nextState stateID) { +func (ts *ThreadState) Set(nextState State) { ts.mu.Lock() ts.currentState = nextState ts.notifySubscribers(nextState) ts.mu.Unlock() } -func (ts *threadState) notifySubscribers(nextState stateID) { +func (ts *ThreadState) notifySubscribers(nextState State) { if len(ts.subscribers) == 0 { return } @@ -122,7 +109,7 @@ func (ts *threadState) notifySubscribers(nextState stateID) { } // block until the thread reaches a certain state -func (ts *threadState) waitFor(states ...stateID) { +func (ts *ThreadState) WaitFor(states ...State) { ts.mu.Lock() if slices.Contains(states, ts.currentState) { ts.mu.Unlock() @@ -138,15 +125,15 @@ func (ts *threadState) waitFor(states ...stateID) { } // safely request a state change from a different goroutine -func (ts *threadState) requestSafeStateChange(nextState stateID) bool { +func (ts *ThreadState) RequestSafeStateChange(nextState State) bool { ts.mu.Lock() switch ts.currentState { // disallow state changes if shutting down or done - case stateShuttingDown, stateDone, stateReserved: + case ShuttingDown, Done, Reserved: ts.mu.Unlock() return false // ready and inactive are safe states to transition from - case stateReady, stateInactive: + case Ready, Inactive: ts.currentState = nextState ts.notifySubscribers(nextState) ts.mu.Unlock() @@ -155,12 +142,12 @@ func (ts *threadState) requestSafeStateChange(nextState stateID) bool { ts.mu.Unlock() // wait for the state to change to a safe state - ts.waitFor(stateReady, stateInactive, stateShuttingDown) - return ts.requestSafeStateChange(nextState) + ts.WaitFor(Ready, Inactive, ShuttingDown) + return ts.RequestSafeStateChange(nextState) } // markAsWaiting hints that the thread reached a stable state and is waiting for requests or shutdown -func (ts *threadState) markAsWaiting(isWaiting bool) { +func (ts *ThreadState) MarkAsWaiting(isWaiting bool) { ts.mu.Lock() if isWaiting { ts.isWaiting = true @@ -172,7 +159,7 @@ func (ts *threadState) markAsWaiting(isWaiting bool) { } // isWaitingState returns true if a thread is waiting for a request or shutdown -func (ts *threadState) isInWaitingState() bool { +func (ts *ThreadState) IsInWaitingState() bool { ts.mu.RLock() isWaiting := ts.isWaiting ts.mu.RUnlock() @@ -180,7 +167,7 @@ func (ts *threadState) isInWaitingState() bool { } // waitTime returns the time since the thread is waiting in a stable state in ms -func (ts *threadState) waitTime() int64 { +func (ts *ThreadState) WaitTime() int64 { ts.mu.RLock() waitTime := int64(0) if ts.isWaiting { @@ -189,3 +176,9 @@ func (ts *threadState) waitTime() int64 { ts.mu.RUnlock() return waitTime } + +func (ts *ThreadState) SetWaitTime(t time.Time) { + ts.mu.Lock() + ts.waitingSince = t + ts.mu.Unlock() +} diff --git a/state_test.go b/internal/state/state_test.go similarity index 53% rename from state_test.go rename to internal/state/state_test.go index 7055d35f..3da52660 100644 --- a/state_test.go +++ b/internal/state/state_test.go @@ -1,4 +1,4 @@ -package frankenphp +package state import ( "testing" @@ -8,37 +8,38 @@ import ( ) func Test2GoroutinesYieldToEachOtherViaStates(t *testing.T) { - threadState := &threadState{currentState: stateBooting} + threadState := &ThreadState{currentState: Booting} go func() { - threadState.waitFor(stateInactive) - assert.True(t, threadState.is(stateInactive)) - threadState.set(stateReady) + threadState.WaitFor(Inactive) + assert.True(t, threadState.Is(Inactive)) + threadState.Set(Ready) }() - threadState.set(stateInactive) - threadState.waitFor(stateReady) - assert.True(t, threadState.is(stateReady)) + threadState.Set(Inactive) + threadState.WaitFor(Ready) + assert.True(t, threadState.Is(Ready)) } func TestStateShouldHaveCorrectAmountOfSubscribers(t *testing.T) { - threadState := &threadState{currentState: stateBooting} + threadState := &ThreadState{currentState: Booting} // 3 subscribers waiting for different states - go threadState.waitFor(stateInactive) - go threadState.waitFor(stateInactive, stateShuttingDown) - go threadState.waitFor(stateShuttingDown) + go threadState.WaitFor(Inactive) + go threadState.WaitFor(Inactive, ShuttingDown) + go threadState.WaitFor(ShuttingDown) assertNumberOfSubscribers(t, threadState, 3) - threadState.set(stateInactive) + threadState.Set(Inactive) assertNumberOfSubscribers(t, threadState, 1) - assert.True(t, threadState.compareAndSwap(stateInactive, stateShuttingDown)) + assert.True(t, threadState.CompareAndSwap(Inactive, ShuttingDown)) assertNumberOfSubscribers(t, threadState, 0) } -func assertNumberOfSubscribers(t *testing.T, threadState *threadState, expected int) { +func assertNumberOfSubscribers(t *testing.T, threadState *ThreadState, expected int) { + t.Helper() for range 10_000 { // wait for 1 second max time.Sleep(100 * time.Microsecond) threadState.mu.RLock() diff --git a/phpmainthread.go b/phpmainthread.go index 246cb6e2..cecadc16 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -14,12 +14,13 @@ import ( "github.com/dunglas/frankenphp/internal/memory" "github.com/dunglas/frankenphp/internal/phpheaders" + "github.com/dunglas/frankenphp/internal/state" ) // represents the main PHP thread // the thread needs to keep running as long as all other threads are running type phpMainThread struct { - state *threadState + state *state.ThreadState done chan struct{} numThreads int maxThreads int @@ -39,7 +40,7 @@ var ( // and reserves a fixed number of possible PHP threads func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) (*phpMainThread, error) { mainThread = &phpMainThread{ - state: newThreadState(), + state: state.NewThreadState(), done: make(chan struct{}), numThreads: numThreads, maxThreads: numMaxThreads, @@ -80,11 +81,11 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) func drainPHPThreads() { doneWG := sync.WaitGroup{} doneWG.Add(len(phpThreads)) - mainThread.state.set(stateShuttingDown) + mainThread.state.Set(state.ShuttingDown) close(mainThread.done) for _, thread := range phpThreads { // shut down all reserved threads - if thread.state.compareAndSwap(stateReserved, stateDone) { + if thread.state.CompareAndSwap(state.Reserved, state.Done) { doneWG.Done() continue } @@ -96,8 +97,8 @@ func drainPHPThreads() { } doneWG.Wait() - mainThread.state.set(stateDone) - mainThread.state.waitFor(stateReserved) + mainThread.state.Set(state.Done) + mainThread.state.WaitFor(state.Reserved) phpThreads = nil } @@ -106,7 +107,7 @@ func (mainThread *phpMainThread) start() error { return ErrMainThreadCreation } - mainThread.state.waitFor(stateReady) + mainThread.state.WaitFor(state.Ready) // cache common request headers as zend_strings (HTTP_ACCEPT, HTTP_USER_AGENT, etc.) mainThread.commonHeaders = make(map[string]*C.zend_string, len(phpheaders.CommonRequestHeaders)) @@ -125,13 +126,13 @@ func (mainThread *phpMainThread) start() error { func getInactivePHPThread() *phpThread { for _, thread := range phpThreads { - if thread.state.is(stateInactive) { + if thread.state.Is(state.Inactive) { return thread } } for _, thread := range phpThreads { - if thread.state.compareAndSwap(stateReserved, stateBootRequested) { + if thread.state.CompareAndSwap(state.Reserved, state.BootRequested) { thread.boot() return thread } @@ -147,8 +148,8 @@ func go_frankenphp_main_thread_is_ready() { mainThread.maxThreads = mainThread.numThreads } - mainThread.state.set(stateReady) - mainThread.state.waitFor(stateDone) + mainThread.state.Set(state.Ready) + mainThread.state.WaitFor(state.Done) } // max_threads = auto @@ -174,7 +175,7 @@ func (mainThread *phpMainThread) setAutomaticMaxThreads() { //export go_frankenphp_shutdown_main_thread func go_frankenphp_shutdown_main_thread() { - mainThread.state.set(stateReserved) + mainThread.state.Set(state.Reserved) } //export go_get_custom_php_ini diff --git a/phpmainthread_test.go b/phpmainthread_test.go index bf842ef6..90aa2033 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -12,7 +12,7 @@ import ( "testing" "time" - "github.com/dunglas/frankenphp/internal/phpheaders" + "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -32,7 +32,7 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { assert.Len(t, phpThreads, 1) assert.Equal(t, 0, phpThreads[0].threadIndex) - assert.True(t, phpThreads[0].state.is(stateInactive)) + assert.True(t, phpThreads[0].state.Is(state.Inactive)) drainPHPThreads() @@ -167,7 +167,7 @@ func TestFinishBootingAWorkerScript(t *testing.T) { // boot the worker worker := getDummyWorker(t, "transition-worker-1.php") convertToWorkerThread(phpThreads[0], worker) - phpThreads[0].state.waitFor(stateReady) + phpThreads[0].state.WaitFor(state.Ready) assert.NotNil(t, phpThreads[0].handler.(*workerThread).dummyContext) assert.Nil(t, phpThreads[0].handler.(*workerThread).workerContext) @@ -209,9 +209,8 @@ func getDummyWorker(t *testing.T, fileName string) *worker { } worker, _ := newWorker(workerOpt{ - fileName: testDataPath + "/" + fileName, - num: 1, - maxConsecutiveFailures: defaultMaxConsecutiveFailures, + fileName: testDataPath + "/" + fileName, + num: 1, }) workers = append(workers, worker) @@ -237,7 +236,7 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT convertToRegularThread, func(thread *phpThread) { thread.shutdown() }, func(thread *phpThread) { - if thread.state.is(stateReserved) { + if thread.state.Is(state.Reserved) { thread.boot() } }, @@ -248,20 +247,6 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT } } -func TestAllCommonHeadersAreCorrect(t *testing.T) { - fakeRequest := httptest.NewRequest("GET", "http://localhost", nil) - - for header, phpHeader := range phpheaders.CommonRequestHeaders { - // verify that common and uncommon headers return the same result - expectedPHPHeader := phpheaders.GetUnCommonHeader(t.Context(), header) - assert.Equal(t, phpHeader+"\x00", expectedPHPHeader, "header is not well formed: "+phpHeader) - - // net/http will capitalize lowercase headers, verify that headers are capitalized - fakeRequest.Header.Add(header, "foo") - assert.Contains(t, fakeRequest.Header, header, "header is not correctly capitalized: "+header) - } -} - func TestCorrectThreadCalculation(t *testing.T) { maxProcs := runtime.GOMAXPROCS(0) * 2 oneWorkerThread := []workerOpt{{num: 1}} diff --git a/phpthread.go b/phpthread.go index 7c66f2b7..1726cf9d 100644 --- a/phpthread.go +++ b/phpthread.go @@ -8,6 +8,8 @@ import ( "runtime" "sync" "unsafe" + + "github.com/dunglas/frankenphp/internal/state" ) // representation of the actual underlying PHP thread @@ -19,7 +21,7 @@ type phpThread struct { drainChan chan struct{} handlerMu sync.Mutex handler threadHandler - state *threadState + state *state.ThreadState sandboxedEnv map[string]*C.zend_string } @@ -36,15 +38,15 @@ func newPHPThread(threadIndex int) *phpThread { return &phpThread{ threadIndex: threadIndex, requestChan: make(chan contextHolder), - state: newThreadState(), + state: state.NewThreadState(), } } // boot starts the underlying PHP thread func (thread *phpThread) boot() { // thread must be in reserved state to boot - if !thread.state.compareAndSwap(stateReserved, stateBooting) && !thread.state.compareAndSwap(stateBootRequested, stateBooting) { - panic("thread is not in reserved state: " + thread.state.name()) + if !thread.state.CompareAndSwap(state.Reserved, state.Booting) && !thread.state.CompareAndSwap(state.BootRequested, state.Booting) { + panic("thread is not in reserved state: " + thread.state.Name()) } // boot threads as inactive @@ -58,22 +60,22 @@ func (thread *phpThread) boot() { panic("unable to create thread") } - thread.state.waitFor(stateInactive) + thread.state.WaitFor(state.Inactive) } // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { - if !thread.state.requestSafeStateChange(stateShuttingDown) { + if !thread.state.RequestSafeStateChange(state.ShuttingDown) { // already shutting down or done return } close(thread.drainChan) - thread.state.waitFor(stateDone) + thread.state.WaitFor(state.Done) thread.drainChan = make(chan struct{}) // threads go back to the reserved state from which they can be booted again - if mainThread.state.is(stateReady) { - thread.state.set(stateReserved) + if mainThread.state.Is(state.Ready) { + thread.state.Set(state.Reserved) } } @@ -82,24 +84,23 @@ func (thread *phpThread) shutdown() { func (thread *phpThread) setHandler(handler threadHandler) { thread.handlerMu.Lock() defer thread.handlerMu.Unlock() - - if !thread.state.requestSafeStateChange(stateTransitionRequested) { + if !thread.state.RequestSafeStateChange(state.TransitionRequested) { // no state change allowed == shutdown or done return } close(thread.drainChan) - thread.state.waitFor(stateTransitionInProgress) + thread.state.WaitFor(state.TransitionInProgress) thread.handler = handler thread.drainChan = make(chan struct{}) - thread.state.set(stateTransitionComplete) + thread.state.Set(state.TransitionComplete) } // transition to a new handler safely // is triggered by setHandler and executed on the PHP thread func (thread *phpThread) transitionToNewHandler() string { - thread.state.set(stateTransitionInProgress) - thread.state.waitFor(stateTransitionComplete) + thread.state.Set(state.TransitionInProgress) + thread.state.WaitFor(state.TransitionComplete) // execute beforeScriptExecution of the new handler return thread.handler.beforeScriptExecution() @@ -142,6 +143,10 @@ func (thread *phpThread) pinCString(s string) *C.char { return thread.pinString(s + "\x00") } +func (*phpThread) updateContext(isWorker bool) { + C.frankenphp_update_local_thread_context(C.bool(isWorker)) +} + //export go_frankenphp_before_script_execution func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char { thread := phpThreads[threadIndex] @@ -172,5 +177,5 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] thread.Unpin() - thread.state.set(stateDone) + thread.state.Set(state.Done) } diff --git a/scaling.go b/scaling.go index af7c6c29..37e081ab 100644 --- a/scaling.go +++ b/scaling.go @@ -10,6 +10,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/cpu" + "github.com/dunglas/frankenphp/internal/state" ) const ( @@ -67,7 +68,7 @@ func addRegularThread() (*phpThread, error) { return nil, ErrMaxThreadsReached } convertToRegularThread(thread) - thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) + thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved) return thread, nil } @@ -77,7 +78,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { return nil, ErrMaxThreadsReached } convertToWorkerThread(thread, worker) - thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) + thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved) return thread, nil } @@ -86,7 +87,7 @@ func scaleWorkerThread(worker *worker) { scalingMu.Lock() defer scalingMu.Unlock() - if !mainThread.state.is(stateReady) { + if !mainThread.state.Is(state.Ready) { return } @@ -116,7 +117,7 @@ func scaleRegularThread() { scalingMu.Lock() defer scalingMu.Unlock() - if !mainThread.state.is(stateReady) { + if !mainThread.state.Is(state.Ready) { return } @@ -212,18 +213,18 @@ func deactivateThreads() { thread := autoScaledThreads[i] // the thread might have been stopped otherwise, remove it - if thread.state.is(stateReserved) { + if thread.state.Is(state.Reserved) { autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) continue } - waitTime := thread.state.waitTime() + waitTime := thread.state.WaitTime() if stoppedThreadCount > maxTerminationCount || waitTime == 0 { continue } // convert threads to inactive if they have been idle for too long - if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() { + if thread.state.Is(state.Ready) && waitTime > maxThreadIdleTime.Milliseconds() { convertToInactiveThread(thread) stoppedThreadCount++ autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) @@ -238,7 +239,7 @@ func deactivateThreads() { // TODO: Completely stopping threads is more memory efficient // Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory) // Reactivate this if there is a better solution or workaround - // if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() { + // if thread.state.Is(state.Inactive) && waitTime > maxThreadIdleTime.Milliseconds() { // logger.LogAttrs(nil, slog.LevelDebug, "auto-stopping thread", slog.Int("thread", thread.threadIndex)) // thread.shutdown() // stoppedThreadCount++ diff --git a/scaling_test.go b/scaling_test.go index 89e04b51..da7a4d29 100644 --- a/scaling_test.go +++ b/scaling_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/dunglas/frankenphp/internal/state" "github.com/stretchr/testify/assert" ) @@ -20,7 +21,7 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) { // scale up scaleRegularThread() - assert.Equal(t, stateReady, autoScaledThread.state.get()) + assert.Equal(t, state.Ready, autoScaledThread.state.Get()) assert.IsType(t, ®ularThread{}, autoScaledThread.handler) // on down-scale, the thread will be marked as inactive @@ -49,7 +50,7 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) { // scale up scaleWorkerThread(getWorkerByPath(workerPath)) - assert.Equal(t, stateReady, autoScaledThread.state.get()) + assert.Equal(t, state.Ready, autoScaledThread.state.Get()) // on down-scale, the thread will be marked as inactive setLongWaitTime(autoScaledThread) @@ -60,7 +61,5 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) { } func setLongWaitTime(thread *phpThread) { - thread.state.mu.Lock() - thread.state.waitingSince = time.Now().Add(-time.Hour) - thread.state.mu.Unlock() + thread.state.SetWaitTime(time.Now().Add(-time.Hour)) } diff --git a/testdata/failing-worker.php b/testdata/failing-worker.php index 108d2ff8..0bb001f1 100644 --- a/testdata/failing-worker.php +++ b/testdata/failing-worker.php @@ -1,18 +1,7 @@ = 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures { + startupFailChan <- fmt.Errorf("too many consecutive failures: worker %s has not reached frankenphp_handle_request()", worker.fileName) + handler.thread.state.Set(state.ShuttingDown) + return } - // panic after exponential backoff if the worker has never reached frankenphp_handle_request - if handler.backoff.recordFailure() { - if !watcherIsEnabled && !handler.state.is(stateReady) { - panic("too many consecutive worker failures") + if watcherIsEnabled { + // worker script has probably failed due to script changes while watcher is enabled + if globalLogger.Enabled(globalCtx, slog.LevelError) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "(watcher enabled) worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) } - + } else { + // rare case where worker script has failed on a restart during normal operation + // this can happen if startup success depends on external resources if globalLogger.Enabled(globalCtx, slog.LevelWarn) { - globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.failureCount)) + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount)) } } + + // wait a bit and try again (exponential backoff) + backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond + if backoffDuration > time.Second { + backoffDuration = time.Second + } + handler.failureCount++ + time.Sleep(backoffDuration) } // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script. @@ -194,20 +203,21 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { // Clear the first dummy request created to initialize the worker if handler.isBootingScript { handler.isBootingScript = false + handler.failureCount = 0 if !C.frankenphp_shutdown_dummy_request() { panic("Not in CGI context") } } // worker threads are 'ready' after they first reach frankenphp_handle_request() - // 'stateTransitionComplete' is only true on the first boot of the worker script, + // 'state.TransitionComplete' is only true on the first boot of the worker script, // while 'isBootingScript' is true on every boot of the worker script - if handler.state.is(stateTransitionComplete) { + if handler.state.Is(state.TransitionComplete) { metrics.ReadyWorker(handler.worker.name) - handler.state.set(stateReady) + handler.state.Set(state.Ready) } - handler.state.markAsWaiting(true) + handler.state.MarkAsWaiting(true) var requestCH contextHolder select { @@ -218,7 +228,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { // flush the opcache when restarting due to watcher or admin api // note: this is done right before frankenphp_handle_request() returns 'false' - if handler.state.is(stateRestarting) { + if handler.state.Is(state.Restarting) { C.frankenphp_reset_opcache() } @@ -229,7 +239,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { handler.workerContext = requestCH.ctx handler.workerFrankenPHPContext = requestCH.frankenPHPContext - handler.state.markAsWaiting(false) + handler.state.MarkAsWaiting(false) if globalLogger.Enabled(requestCH.ctx, slog.LevelDebug) { if handler.workerFrankenPHPContext.request == nil { diff --git a/worker.go b/worker.go index 5b83f675..4960ecc7 100644 --- a/worker.go +++ b/worker.go @@ -13,6 +13,7 @@ import ( "time" "github.com/dunglas/frankenphp/internal/fastabs" + "github.com/dunglas/frankenphp/internal/state" "github.com/dunglas/frankenphp/internal/watcher" ) @@ -36,22 +37,26 @@ type worker struct { var ( workers []*worker watcherIsEnabled bool + startupFailChan chan (error) ) func initWorkers(opt []workerOpt) error { workers = make([]*worker, 0, len(opt)) directoriesToWatch := getDirectoriesToWatch(opt) watcherIsEnabled = len(directoriesToWatch) > 0 + totalThreadsToStart := 0 for _, o := range opt { w, err := newWorker(o) if err != nil { return err } + totalThreadsToStart += w.num workers = append(workers, w) } var workersReady sync.WaitGroup + startupFailChan = make(chan error, totalThreadsToStart) for _, w := range workers { for i := 0; i < w.num; i++ { @@ -59,18 +64,27 @@ func initWorkers(opt []workerOpt) error { convertToWorkerThread(thread, w) workersReady.Go(func() { - thread.state.waitFor(stateReady) + thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done) }) } } workersReady.Wait() + select { + case err := <-startupFailChan: + // at least 1 worker has failed, shut down and return an error + Shutdown() + return fmt.Errorf("failed to initialize workers: %w", err) + default: + // all workers started successfully + startupFailChan = nil + } + if !watcherIsEnabled { return nil } - watcherIsEnabled = true if err := watcher.InitWatcher(globalCtx, directoriesToWatch, RestartWorkers, globalLogger); err != nil { return err } @@ -167,7 +181,7 @@ func drainWorkerThreads() []*phpThread { worker.threadMutex.RLock() ready.Add(len(worker.threads)) for _, thread := range worker.threads { - if !thread.state.requestSafeStateChange(stateRestarting) { + if !thread.state.RequestSafeStateChange(state.Restarting) { ready.Done() // no state change allowed == thread is shutting down // we'll proceed to restart all other threads anyways @@ -176,7 +190,7 @@ func drainWorkerThreads() []*phpThread { close(thread.drainChan) drainedThreads = append(drainedThreads, thread) go func(thread *phpThread) { - thread.state.waitFor(stateYielding) + thread.state.WaitFor(state.Yielding) ready.Done() }(thread) } @@ -203,7 +217,7 @@ func RestartWorkers() { for _, thread := range threadsToRestart { thread.drainChan = make(chan struct{}) - thread.state.set(stateReady) + thread.state.Set(state.Ready) } }