Files
frankenphp/thread-worker.go
Alliballibaba2 f592e0f47b refactor: decouple worker threads from non-worker threads (#1137)
* Decouple workers.

* Moves code to separate file.

* Cleans up the exponential backoff.

* Initial working implementation.

* Refactors php threads to take callbacks.

* Cleanup.

* Cleanup.

* Cleanup.

* Cleanup.

* Adjusts watcher logic.

* Adjusts the watcher logic.

* Fix opcache_reset race condition.

* Fixing merge conflicts and formatting.

* Prevents overlapping of TSRM reservation and script execution.

* Adjustments as suggested by @dunglas.

* Adds error assertions.

* Adds comments.

* Removes logs and explicitly compares to C.false.

* Resets check.

* Adds cast for safety.

* Fixes waitgroup overflow.

* Resolves waitgroup race condition on startup.

* Moves worker request logic to worker.go.

* Removes defer.

* Removes call from go to c.

* Fixes merge conflict.

* Adds fibers test back in.

* Refactors new thread loop approach.

* Removes redundant check.

* Adds compareAndSwap.

* Refactor: removes global waitgroups and uses a 'thread state' abstraction instead.

* Removes unnecessary method.

* Updates comment.

* Removes unnecessary booleans.

* test

* First state machine steps.

* Splits threads.

* Minimal working implementation with broken tests.

* Fixes tests.

* Refactoring.

* Fixes merge conflicts.

* Formatting

* C formatting.

* More cleanup.

* Allows for clean state transitions.

* Adds state tests.

* Adds support for thread transitioning.

* Fixes the testdata path.

* Formatting.

* Allows transitioning back to inactive state.

* Fixes go linting.

* Formatting.

* Removes duplication.

* Applies suggestions by @dunglas

* Removes redundant check.

* Locks the handler on restart.

* Removes unnecessary log.

* Changes Unpin() logic as suggested by @withinboredom

* Adds suggestions by @dunglas and resolves TODO.

* Makes restarts fully safe.

* Will make the initial startup fail even if the watcher is enabled (as is currently the case)

* Also adds compareAndSwap to the test.

* Adds comment.

* Prevents panic on initial watcher startup.
2024-12-17 11:28:51 +01:00

223 lines
6.6 KiB
Go

package frankenphp
// #include "frankenphp.h"
import "C"
import (
"net/http"
"path/filepath"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// representation of a thread assigned to a worker script
// executes the PHP worker script in a loop
// implements the threadHandler interface
type workerThread struct {
state *threadState
thread *phpThread
worker *worker
fakeRequest *http.Request
workerRequest *http.Request
backoff *exponentialBackoff
}
func convertToWorkerThread(thread *phpThread, worker *worker) {
thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
worker: worker,
backoff: &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: 6,
},
})
worker.attachThread(thread)
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *workerThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
handler.worker.detachThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateRestarting:
handler.state.set(stateYielding)
handler.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateReady, stateTransitionComplete:
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
func (handler *workerThread) afterScriptExecution(exitStatus int) {
tearDownWorkerScript(handler, exitStatus)
}
func (handler *workerThread) getActiveRequest() *http.Request {
if handler.workerRequest != nil {
return handler.workerRequest
}
return handler.fakeRequest
}
func setupWorkerScript(handler *workerThread, worker *worker) {
handler.backoff.wait()
metrics.StartWorker(worker.fileName)
// Create a dummy request to set up the worker
r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
if err != nil {
panic(err)
}
r, err = NewRequestWithContext(
r,
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
)
if err != nil {
panic(err)
}
if err := updateServerContext(handler.thread, r, true, false); err != nil {
panic(err)
}
handler.fakeRequest = r
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
}
}
func tearDownWorkerScript(handler *workerThread, exitStatus int) {
// if the worker request is not nil, the script might have crashed
// make sure to close the worker request context
if handler.workerRequest != nil {
fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
handler.workerRequest = nil
}
fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
defer func() {
handler.fakeRequest = nil
}()
// on exit status 0 we just run the worker script again
worker := handler.worker
if fc.exitStatus == 0 {
// TODO: make the max restart configurable
metrics.StopWorker(worker.fileName, StopReasonRestart)
handler.backoff.recordSuccess()
if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
return
}
// TODO: error status
// on exit status 1 we apply an exponential backoff when restarting
metrics.StopWorker(worker.fileName, StopReasonCrash)
if handler.backoff.recordFailure() {
if !watcherIsEnabled {
logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
}
}
func (handler *workerThread) waitForWorkerRequest() bool {
// unpin any memory left over from previous requests
handler.thread.Unpin()
if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName))
}
if handler.state.compareAndSwap(stateTransitionComplete, stateReady) {
metrics.ReadyWorker(handler.worker.fileName)
}
var r *http.Request
select {
case <-handler.thread.drainChan:
if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName))
}
// execute opcache_reset if the restart was triggered by the watcher
if watcherIsEnabled && handler.state.is(stateRestarting) {
C.frankenphp_reset_opcache()
}
return false
case r = <-handler.thread.requestChan:
case r = <-handler.worker.requestChan:
}
handler.workerRequest = r
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
}
if err := updateServerContext(handler.thread, r, false, true); err != nil {
// Unexpected error or invalid request
if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
}
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
rejectRequest(fc.responseWriter, err.Error())
maybeCloseContext(fc)
handler.workerRequest = nil
return handler.waitForWorkerRequest()
}
return true
}
//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
handler := phpThreads[threadIndex].handler.(*workerThread)
return C.bool(handler.waitForWorkerRequest())
}
//export go_frankenphp_finish_worker_request
func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
thread := phpThreads[threadIndex]
r := thread.getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
thread.handler.(*workerThread).workerRequest = nil
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
}
}
// when frankenphp_finish_request() is directly called from PHP
//
//export go_frankenphp_finish_php_request
func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
r := phpThreads[threadIndex].getActiveRequest()
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
c.Write(zap.String("url", r.RequestURI))
}
}