feat: Adds automatic thread scaling at runtime and php_ini configuration in Caddyfile (#1266)

Adds option to scale threads at runtime

Adds php_ini configuration in Caddyfile
This commit is contained in:
Alliballibaba2
2025-02-19 20:39:33 +01:00
committed by GitHub
parent 965fa6570c
commit 072151dfee
46 changed files with 1772 additions and 208 deletions

View File

@@ -19,7 +19,7 @@ runs:
name: Compile e-dant/watcher
run: |
mkdir watcher
gh release download --repo e-dant/watcher -A tar.gz -O - | tar -xz -C watcher --strip-components 1
gh release download 0.13.2 --repo e-dant/watcher -A tar.gz -O - | tar -xz -C watcher --strip-components 1
cd watcher
cmake -S . -B build -DCMAKE_BUILD_TYPE=Release
cmake --build build

65
caddy/admin.go Normal file
View File

@@ -0,0 +1,65 @@
package caddy
import (
"encoding/json"
"fmt"
"github.com/caddyserver/caddy/v2"
"github.com/dunglas/frankenphp"
"net/http"
)
type FrankenPHPAdmin struct{}
// if the id starts with "admin.api" the module will register AdminRoutes via module.Routes()
func (FrankenPHPAdmin) CaddyModule() caddy.ModuleInfo {
return caddy.ModuleInfo{
ID: "admin.api.frankenphp",
New: func() caddy.Module { return new(FrankenPHPAdmin) },
}
}
// EXPERIMENTAL: These routes are not yet stable and may change in the future.
func (admin FrankenPHPAdmin) Routes() []caddy.AdminRoute {
return []caddy.AdminRoute{
{
Pattern: "/frankenphp/workers/restart",
Handler: caddy.AdminHandlerFunc(admin.restartWorkers),
},
{
Pattern: "/frankenphp/threads",
Handler: caddy.AdminHandlerFunc(admin.threads),
},
}
}
func (admin *FrankenPHPAdmin) restartWorkers(w http.ResponseWriter, r *http.Request) error {
if r.Method != http.MethodPost {
return admin.error(http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
}
frankenphp.RestartWorkers()
caddy.Log().Info("workers restarted from admin api")
admin.success(w, "workers restarted successfully\n")
return nil
}
func (admin *FrankenPHPAdmin) threads(w http.ResponseWriter, r *http.Request) error {
debugState := frankenphp.DebugState()
prettyJson, err := json.MarshalIndent(debugState, "", " ")
if err != nil {
return admin.error(http.StatusInternalServerError, err)
}
return admin.success(w, string(prettyJson))
}
func (admin *FrankenPHPAdmin) success(w http.ResponseWriter, message string) error {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(message))
return err
}
func (admin *FrankenPHPAdmin) error(statusCode int, err error) error {
return caddy.APIError{HTTPStatus: statusCode, Err: err}
}

215
caddy/admin_test.go Normal file
View File

@@ -0,0 +1,215 @@
package caddy_test
import (
"encoding/json"
"io"
"net/http"
"sync"
"testing"
"github.com/caddyserver/caddy/v2/caddytest"
"github.com/dunglas/frankenphp"
"github.com/stretchr/testify/assert"
)
func TestRestartWorkerViaAdminApi(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
frankenphp {
worker ../testdata/worker-with-counter.php 1
}
}
localhost:`+testPort+` {
route {
root ../testdata
rewrite worker-with-counter.php
php
}
}
`, "caddyfile")
tester.AssertGetResponse("http://localhost:"+testPort+"/", http.StatusOK, "requests:1")
tester.AssertGetResponse("http://localhost:"+testPort+"/", http.StatusOK, "requests:2")
assertAdminResponse(t, tester, "POST", "workers/restart", http.StatusOK, "workers restarted successfully\n")
tester.AssertGetResponse("http://localhost:"+testPort+"/", http.StatusOK, "requests:1")
}
func TestShowTheCorrectThreadDebugStatus(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
frankenphp {
num_threads 3
max_threads 6
worker ../testdata/worker-with-counter.php 1
worker ../testdata/index.php 1
}
}
localhost:`+testPort+` {
route {
root ../testdata
rewrite worker-with-counter.php
php
}
}
`, "caddyfile")
debugState := getDebugState(t, tester)
// assert that the correct threads are present in the thread info
assert.Equal(t, debugState.ThreadDebugStates[0].State, "ready")
assert.Contains(t, debugState.ThreadDebugStates[1].Name, "worker-with-counter.php")
assert.Contains(t, debugState.ThreadDebugStates[2].Name, "index.php")
assert.Equal(t, debugState.ReservedThreadCount, 3)
assert.Len(t, debugState.ThreadDebugStates, 3)
}
func TestAutoScaleWorkerThreads(t *testing.T) {
wg := sync.WaitGroup{}
maxTries := 10
requestsPerTry := 200
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
frankenphp {
max_threads 10
num_threads 2
worker ../testdata/sleep.php 1
}
}
localhost:`+testPort+` {
route {
root ../testdata
rewrite sleep.php
php
}
}
`, "caddyfile")
// spam an endpoint that simulates IO
endpoint := "http://localhost:" + testPort + "/?sleep=2&work=1000"
amountOfThreads := len(getDebugState(t, tester).ThreadDebugStates)
// try to spawn the additional threads by spamming the server
for tries := 0; tries < maxTries; tries++ {
wg.Add(requestsPerTry)
for i := 0; i < requestsPerTry; i++ {
go func() {
tester.AssertGetResponse(endpoint, http.StatusOK, "slept for 2 ms and worked for 1000 iterations")
wg.Done()
}()
}
wg.Wait()
amountOfThreads = len(getDebugState(t, tester).ThreadDebugStates)
if amountOfThreads > 2 {
break
}
}
// assert that there are now more threads than before
assert.NotEqual(t, amountOfThreads, 2)
}
// Note this test requires at least 2x40MB available memory for the process
func TestAutoScaleRegularThreadsOnAutomaticThreadLimit(t *testing.T) {
wg := sync.WaitGroup{}
maxTries := 10
requestsPerTry := 200
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
frankenphp {
max_threads auto
num_threads 1
php_ini memory_limit 40M # a reasonable limit for the test
}
}
localhost:`+testPort+` {
route {
root ../testdata
php
}
}
`, "caddyfile")
// spam an endpoint that simulates IO
endpoint := "http://localhost:" + testPort + "/sleep.php?sleep=2&work=1000"
amountOfThreads := len(getDebugState(t, tester).ThreadDebugStates)
// try to spawn the additional threads by spamming the server
for tries := 0; tries < maxTries; tries++ {
wg.Add(requestsPerTry)
for i := 0; i < requestsPerTry; i++ {
go func() {
tester.AssertGetResponse(endpoint, http.StatusOK, "slept for 2 ms and worked for 1000 iterations")
wg.Done()
}()
}
wg.Wait()
amountOfThreads = len(getDebugState(t, tester).ThreadDebugStates)
if amountOfThreads > 1 {
break
}
}
// assert that there are now more threads present
assert.NotEqual(t, amountOfThreads, 1)
}
func assertAdminResponse(t *testing.T, tester *caddytest.Tester, method string, path string, expectedStatus int, expectedBody string) {
adminUrl := "http://localhost:2999/frankenphp/"
r, err := http.NewRequest(method, adminUrl+path, nil)
assert.NoError(t, err)
if expectedBody == "" {
_ = tester.AssertResponseCode(r, expectedStatus)
return
}
_, _ = tester.AssertResponse(r, expectedStatus, expectedBody)
}
func getAdminResponseBody(t *testing.T, tester *caddytest.Tester, method string, path string) string {
adminUrl := "http://localhost:2999/frankenphp/"
r, err := http.NewRequest(method, adminUrl+path, nil)
assert.NoError(t, err)
resp := tester.AssertResponseCode(r, http.StatusOK)
defer resp.Body.Close()
bytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
return string(bytes)
}
func getDebugState(t *testing.T, tester *caddytest.Tester) frankenphp.FrankenPHPDebugState {
threadStates := getAdminResponseBody(t, tester, "GET", "threads")
var debugStates frankenphp.FrankenPHPDebugState
err := json.Unmarshal([]byte(threadStates), &debugStates)
assert.NoError(t, err)
return debugStates
}

View File

@@ -27,9 +27,12 @@ import (
const defaultDocumentRoot = "public"
var iniError = errors.New("'php_ini' must be in the format: php_ini \"<key>\" \"<value>\"")
func init() {
caddy.RegisterModule(FrankenPHPApp{})
caddy.RegisterModule(FrankenPHPModule{})
caddy.RegisterModule(FrankenPHPAdmin{})
httpcaddyfile.RegisterGlobalOption("frankenphp", parseGlobalOption)
@@ -54,8 +57,12 @@ type workerConfig struct {
type FrankenPHPApp struct {
// NumThreads sets the number of PHP threads to start. Default: 2x the number of available CPUs.
NumThreads int `json:"num_threads,omitempty"`
// MaxThreads limits how many threads can be started at runtime. Default 2x NumThreads
MaxThreads int `json:"max_threads,omitempty"`
// Workers configures the worker scripts to start.
Workers []workerConfig `json:"workers,omitempty"`
// Overwrites the default php ini configuration
PhpIni map[string]string `json:"php_ini,omitempty"`
metrics frankenphp.Metrics
logger *zap.Logger
@@ -80,7 +87,13 @@ func (f *FrankenPHPApp) Provision(ctx caddy.Context) error {
func (f *FrankenPHPApp) Start() error {
repl := caddy.NewReplacer()
opts := []frankenphp.Option{frankenphp.WithNumThreads(f.NumThreads), frankenphp.WithLogger(f.logger), frankenphp.WithMetrics(f.metrics)}
opts := []frankenphp.Option{
frankenphp.WithNumThreads(f.NumThreads),
frankenphp.WithMaxThreads(f.MaxThreads),
frankenphp.WithLogger(f.logger),
frankenphp.WithMetrics(f.metrics),
frankenphp.WithPhpIni(f.PhpIni),
}
for _, w := range f.Workers {
opts = append(opts, frankenphp.WithWorkers(repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch))
}
@@ -126,6 +139,58 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
f.NumThreads = v
case "max_threads":
if !d.NextArg() {
return d.ArgErr()
}
if d.Val() == "auto" {
f.MaxThreads = -1
continue
}
v, err := strconv.ParseUint(d.Val(), 10, 32)
if err != nil {
return err
}
f.MaxThreads = int(v)
case "php_ini":
parseIniLine := func(d *caddyfile.Dispenser) error {
key := d.Val()
if !d.NextArg() {
return iniError
}
if f.PhpIni == nil {
f.PhpIni = make(map[string]string)
}
f.PhpIni[key] = d.Val()
if d.NextArg() {
return iniError
}
return nil
}
isBlock := false
for d.NextBlock(1) {
isBlock = true
err := parseIniLine(d)
if err != nil {
return err
}
}
if !isBlock {
if !d.NextArg() {
return iniError
}
err := parseIniLine(d)
if err != nil {
return err
}
}
case "worker":
wc := workerConfig{}
if d.NextArg() {
@@ -192,6 +257,10 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
}
if f.MaxThreads > 0 && f.NumThreads > 0 && f.MaxThreads < f.NumThreads {
return errors.New("'max_threads' must be greater than or equal to 'num_threads'")
}
return nil
}

View File

@@ -648,3 +648,71 @@ func TestAllDefinedServerVars(t *testing.T) {
expectedBody,
)
}
func TestPHPIniConfiguration(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
frankenphp {
num_threads 2
worker ../testdata/ini.php 1
php_ini max_execution_time 100
php_ini memory_limit 10000000
}
}
localhost:`+testPort+` {
route {
root ../testdata
php
}
}
`, "caddyfile")
testSingleIniConfiguration(tester, "max_execution_time", "100")
testSingleIniConfiguration(tester, "memory_limit", "10000000")
}
func TestPHPIniBlockConfiguration(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
frankenphp {
num_threads 1
php_ini {
max_execution_time 15
memory_limit 20000000
}
}
}
localhost:`+testPort+` {
route {
root ../testdata
php
}
}
`, "caddyfile")
testSingleIniConfiguration(tester, "max_execution_time", "15")
testSingleIniConfiguration(tester, "memory_limit", "20000000")
}
func testSingleIniConfiguration(tester *caddytest.Tester, key string, value string) {
// test twice to ensure the ini setting is not lost
for i := 0; i < 2; i++ {
tester.AssertGetResponse(
"http://localhost:"+testPort+"/ini.php?key="+key,
http.StatusOK,
key+":"+value,
)
}
}

View File

@@ -19,7 +19,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
frankenphp {
worker {
file ../testdata/worker-with-watcher.php
file ../testdata/worker-with-counter.php
num 1
watch ./**/*.php
}
@@ -28,7 +28,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
localhost:`+testPort+` {
root ../testdata
rewrite worker-with-watcher.php
rewrite worker-with-counter.php
php
}
`, "caddyfile")

46
debugstate.go Normal file
View File

@@ -0,0 +1,46 @@
package frankenphp
// EXPERIMENTAL: ThreadDebugState prints the state of a single PHP thread - debugging purposes only
type ThreadDebugState struct {
Index int
Name string
State string
IsWaiting bool
IsBusy bool
WaitingSinceMilliseconds int64
}
// EXPERIMENTAL: FrankenPHPDebugState prints the state of all PHP threads - debugging purposes only
type FrankenPHPDebugState struct {
ThreadDebugStates []ThreadDebugState
ReservedThreadCount int
}
// EXPERIMENTAL: DebugState prints the state of all PHP threads - debugging purposes only
func DebugState() FrankenPHPDebugState {
fullState := FrankenPHPDebugState{
ThreadDebugStates: make([]ThreadDebugState, 0, len(phpThreads)),
ReservedThreadCount: 0,
}
for _, thread := range phpThreads {
if thread.state.is(stateReserved) {
fullState.ReservedThreadCount++
continue
}
fullState.ThreadDebugStates = append(fullState.ThreadDebugStates, threadDebugState(thread))
}
return fullState
}
// threadDebugState creates a small jsonable status message for debugging purposes
func threadDebugState(thread *phpThread) ThreadDebugState {
return ThreadDebugState{
Index: thread.threadIndex,
Name: thread.handler.name(),
State: thread.state.name(),
IsWaiting: thread.state.isInWaitingState(),
IsBusy: !thread.state.isInWaitingState(),
WaitingSinceMilliseconds: thread.state.waitTime(),
}
}

View File

@@ -51,6 +51,8 @@ Optionally, the number of threads to create and [worker scripts](worker.md) to s
{
frankenphp {
num_threads <num_threads> # Sets the number of PHP threads to start. Default: 2x the number of available CPUs.
max_threads <num_threads> # Limits the number of additional PHP threads that can be started at runtime. Default: num_threads. Can be set to 'auto'.
php_ini <key> <value> # Set a php.ini directive. Can be used several times to set multiple directives.
worker {
file <path> # Sets the path to the worker script.
num <num> # Sets the number of PHP threads to start, defaults to 2x the number of available CPUs.
@@ -227,6 +229,23 @@ To load [additional PHP configuration files](https://www.php.net/manual/en/confi
the `PHP_INI_SCAN_DIR` environment variable can be used.
When set, PHP will load all the file with the `.ini` extension present in the given directories.
You can also change the PHP configuration using the `php_ini` directive in the `Caddyfile`:
```caddyfile
{
frankenphp {
php_ini memory_limit 256M
# or
php_ini {
memory_limit 256M
max_execution_time 15
}
}
}
```
## Enable the Debug Mode
When using the Docker image, set the `CADDY_GLOBAL_OPTIONS` environment variable to `debug` to enable the debug mode:

View File

@@ -16,6 +16,16 @@ To find the right values, it's best to run load tests simulating real traffic.
To configure the number of threads, use the `num_threads` option of the `php_server` and `php` directives.
To change the number of workers, use the `num` option of the `worker` section of the `frankenphp` directive.
### `max_threads`
While it's always better to know exactly what your traffic will look like, real-life applications tend to be more
unpredictable. The `max_threads` allows FrankenPHP to automatically spawn additional threads at runtime up to the specified limit.
`max_threads` can help you
figure out how many threads you need to handle your traffic and can make the server more resilient to latency spikes.
If set to `auto`, the limit will be estimated based on the `memory_limit` in your `php.ini`. If not able to do so,
`auto` will instead default to 2x `num_threads`.
`max_threads is similar to PHP FPM's [pm.max_children](https://www.php.net/manual/en/install.fpm.configuration.php#pm.max-children).
## Worker Mode
Enabling [the worker mode](worker.md) dramatically improves performance,

View File

@@ -128,6 +128,16 @@ A workaround to using this type of code in worker mode is to restart the worker
The previous worker snippet allows configuring a maximum number of request to handle by setting an environment variable named `MAX_REQUESTS`.
### Restart Workers Manually
While it's possible to restart workers [on file changes](config.md#watching-for-file-changes), it's also possible to restart all workers
gracefully via the [Caddy admin API](https://caddyserver.com/docs/api). If the admin is enabled in your
[Caddyfile](config.md#caddyfile-config), you can ping the restart endpoint with a simple POST request like this:
```console
curl -X POST http://localhost:2019/frankenphp/workers/restart
```
### Worker Failures
If a worker script crashes with a non-zero exit code, FrankenPHP will restart it with an exponential backoff strategy.

View File

@@ -78,7 +78,7 @@ typedef struct frankenphp_server_context {
bool finished;
} frankenphp_server_context;
__thread bool should_filter_var = 0;
bool should_filter_var = 0;
__thread frankenphp_server_context *local_ctx = NULL;
__thread uintptr_t thread_index;
__thread zval *os_environment = NULL;
@@ -767,7 +767,8 @@ frankenphp_register_variable_from_request_info(zend_string *zKey, char *value,
frankenphp_register_trusted_var(zKey, value, strlen(value),
Z_ARRVAL_P(track_vars_array));
} else if (must_be_present) {
frankenphp_register_trusted_var(zKey, "", 0, Z_ARRVAL_P(track_vars_array));
frankenphp_register_trusted_var(zKey, NULL, 0,
Z_ARRVAL_P(track_vars_array));
}
}
@@ -913,12 +914,6 @@ static void *php_thread(void *arg) {
local_ctx = malloc(sizeof(frankenphp_server_context));
/* check if a default filter is set in php.ini and only filter if
* it is, this is deprecated and will be removed in PHP 9 */
char *default_filter;
cfg_get_string("filter.default", &default_filter);
should_filter_var = default_filter != NULL;
// loop until Go signals to stop
char *scriptName = NULL;
while ((scriptName = go_frankenphp_before_script_execution(thread_index))) {
@@ -980,10 +975,22 @@ static void *php_main(void *arg) {
memcpy(frankenphp_sapi_module.ini_entries, HARDCODED_INI,
sizeof(HARDCODED_INI));
#endif
#else
/* overwrite php.ini with custom user settings */
char *php_ini_overrides = go_get_custom_php_ini();
if (php_ini_overrides != NULL) {
frankenphp_sapi_module.ini_entries = php_ini_overrides;
}
#endif
frankenphp_sapi_module.startup(&frankenphp_sapi_module);
/* check if a default filter is set in php.ini and only filter if
* it is, this is deprecated and will be removed in PHP 9 */
char *default_filter;
cfg_get_string("filter.default", &default_filter);
should_filter_var = default_filter != NULL;
go_frankenphp_main_thread_is_ready();
/* channel closed, shutdown gracefully */
@@ -1252,3 +1259,5 @@ int frankenphp_reset_opcache(void) {
}
return 0;
}
int frankenphp_get_current_memory_limit() { return PG(memory_limit); }

View File

@@ -63,8 +63,7 @@ var (
ScriptExecutionError = errors.New("error during PHP script execution")
NotRunningError = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
requestChan chan *http.Request
isRunning bool
isRunning bool
loggerMu sync.RWMutex
logger *zap.Logger
@@ -141,7 +140,8 @@ func clientHasClosed(r *http.Request) bool {
// NewRequestWithContext creates a new FrankenPHP request context.
func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Request, error) {
fc := &FrankenPHPContext{
done: make(chan interface{}),
done: make(chan interface{}),
startedAt: time.Now(),
}
for _, o := range opts {
if err := o(fc); err != nil {
@@ -244,7 +244,7 @@ func Config() PHPConfig {
// MaxThreads is internally used during tests. It is written to, but never read and may go away in the future.
var MaxThreads int
func calculateMaxThreads(opt *opt) (int, int, error) {
func calculateMaxThreads(opt *opt) (int, int, int, error) {
maxProcs := runtime.GOMAXPROCS(0) * 2
var numWorkers int
@@ -266,13 +266,17 @@ func calculateMaxThreads(opt *opt) (int, int, error) {
opt.numThreads = maxProcs
}
} else if opt.numThreads <= numWorkers {
return opt.numThreads, numWorkers, NotEnoughThreads
return opt.numThreads, numWorkers, opt.maxThreads, NotEnoughThreads
}
if opt.maxThreads < opt.numThreads && opt.maxThreads > 0 {
opt.maxThreads = opt.numThreads
}
metrics.TotalThreads(opt.numThreads)
MaxThreads = opt.numThreads
return opt.numThreads, numWorkers, nil
return opt.numThreads, numWorkers, opt.maxThreads, nil
}
// Init starts the PHP runtime and the configured workers.
@@ -312,7 +316,7 @@ func Init(options ...Option) error {
metrics = opt.metrics
}
totalThreadCount, workerThreadCount, err := calculateMaxThreads(opt)
totalThreadCount, workerThreadCount, maxThreadCount, err := calculateMaxThreads(opt)
if err != nil {
return err
}
@@ -332,11 +336,13 @@ func Init(options ...Option) error {
logger.Warn(`ZTS is not enabled, only 1 thread will be available, recompile PHP using the "--enable-zts" configuration option or performance will be degraded`)
}
requestChan = make(chan *http.Request, opt.numThreads)
if err := initPHPThreads(totalThreadCount); err != nil {
mainThread, err := initPHPThreads(totalThreadCount, maxThreadCount, opt.phpIni)
if err != nil {
return err
}
regularRequestChan = make(chan *http.Request, totalThreadCount-workerThreadCount)
regularThreads = make([]*phpThread, 0, totalThreadCount-workerThreadCount)
for i := 0; i < totalThreadCount-workerThreadCount; i++ {
thread := getInactivePHPThread()
convertToRegularThread(thread)
@@ -346,8 +352,10 @@ func Init(options ...Option) error {
return err
}
initAutoScaling(mainThread)
if c := logger.Check(zapcore.InfoLevel, "FrankenPHP started 🐘"); c != nil {
c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", totalThreadCount))
c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", mainThread.numThreads), zap.Int("max_threads", mainThread.maxThreads))
}
if EmbeddedAppPath != "" {
if c := logger.Check(zapcore.InfoLevel, "embedded PHP app 📦"); c != nil {
@@ -365,17 +373,18 @@ func Shutdown() {
}
drainWorkers()
drainAutoScaling()
drainPHPThreads()
metrics.Shutdown()
requestChan = nil
// Remove the installed app
if EmbeddedAppPath != "" {
_ = os.RemoveAll(EmbeddedAppPath)
}
logger.Debug("FrankenPHP shut down")
isRunning = false
logger.Debug("FrankenPHP shut down")
}
func getLogger() *zap.Logger {
@@ -468,7 +477,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
}
fc.responseWriter = responseWriter
fc.startedAt = time.Now()
// Detect if a worker is available to handle this request
if worker, ok := workers[fc.scriptFilename]; ok {
@@ -476,15 +484,8 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
return nil
}
metrics.StartRequest()
select {
case <-mainThread.done:
case requestChan <- request:
<-fc.done
}
metrics.StopRequest()
// If no worker was availabe send the request to non-worker threads
handleRequestWithRegularPHPThreads(request, fc)
return nil
}

View File

@@ -71,6 +71,7 @@ void frankenphp_register_variable_safe(char *key, char *var, size_t val_len,
zend_string *frankenphp_init_persistent_string(const char *string, size_t len);
void frankenphp_release_zend_string(zend_string *z_string);
int frankenphp_reset_opcache(void);
int frankenphp_get_current_memory_limit();
void frankenphp_register_single(zend_string *z_key, char *value, size_t val_len,
zval *track_vars_array);

35
internal/cpu/cpu_unix.go Normal file
View File

@@ -0,0 +1,35 @@
package cpu
// #include <time.h>
import "C"
import (
"runtime"
"time"
)
var cpuCount = runtime.GOMAXPROCS(0)
// ProbeCPUs probes the CPU usage of the process
// if CPUs are not busy, most threads are likely waiting for I/O, so we should scale
// if CPUs are already busy we won't gain much by scaling and want to avoid the overhead of doing so
func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}) bool {
var cpuStart, cpuEnd C.struct_timespec
// note: clock_gettime is a POSIX function
// on Windows we'd need to use QueryPerformanceCounter instead
start := time.Now()
C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuStart)
select {
case <-abort:
return false
case <-time.After(probeTime):
}
C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuEnd)
elapsedTime := float64(time.Since(start).Nanoseconds())
elapsedCpuTime := float64(cpuEnd.tv_sec-cpuStart.tv_sec)*1e9 + float64(cpuEnd.tv_nsec-cpuStart.tv_nsec)
cpuUsage := elapsedCpuTime / elapsedTime / float64(cpuCount)
return cpuUsage < maxCPUUsage
}

View File

@@ -0,0 +1,15 @@
package cpu
import (
"time"
)
// ProbeCPUs fallback that always determines that the CPU limits are not reached
func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}) bool {
select {
case <-abort:
return false
case <-time.After(probeTime):
return true
}
}

View File

@@ -0,0 +1,13 @@
package memory
import "syscall"
func TotalSysMemory() uint64 {
sysInfo := &syscall.Sysinfo_t{}
err := syscall.Sysinfo(sysInfo)
if err != nil {
return 0
}
return uint64(sysInfo.Totalram) * uint64(sysInfo.Unit)
}

View File

@@ -0,0 +1,8 @@
//go:build !linux
package memory
// TotalSysMemory returns 0 if the total system memory cannot be determined
func TotalSysMemory() uint64 {
return 0
}

View File

@@ -42,6 +42,10 @@ type Metrics interface {
// StartWorkerRequest collects started worker requests
StartWorkerRequest(name string)
Shutdown()
QueuedWorkerRequest(name string)
DequeuedWorkerRequest(name string)
QueuedRequest()
DequeuedRequest()
}
type nullMetrics struct{}
@@ -76,6 +80,13 @@ func (n nullMetrics) StartWorkerRequest(string) {
func (n nullMetrics) Shutdown() {
}
func (n nullMetrics) QueuedWorkerRequest(name string) {}
func (n nullMetrics) DequeuedWorkerRequest(name string) {}
func (n nullMetrics) QueuedRequest() {}
func (n nullMetrics) DequeuedRequest() {}
type PrometheusMetrics struct {
registry prometheus.Registerer
totalThreads prometheus.Counter
@@ -87,6 +98,8 @@ type PrometheusMetrics struct {
workerRestarts map[string]prometheus.Counter
workerRequestTime map[string]prometheus.Counter
workerRequestCount map[string]prometheus.Counter
workerQueueDepth map[string]prometheus.Gauge
queueDepth prometheus.Gauge
mu sync.Mutex
}
@@ -236,6 +249,15 @@ func (m *PrometheusMetrics) TotalWorkers(name string, _ int) {
panic(err)
}
}
if _, ok := m.workerQueueDepth[identity]; !ok {
m.workerQueueDepth[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
Name: "worker_queue_depth",
})
m.registry.MustRegister(m.workerQueueDepth[identity])
}
}
func (m *PrometheusMetrics) TotalThreads(num int) {
@@ -267,9 +289,32 @@ func (m *PrometheusMetrics) StartWorkerRequest(name string) {
m.busyWorkers[name].Inc()
}
func (m *PrometheusMetrics) QueuedWorkerRequest(name string) {
if _, ok := m.workerQueueDepth[name]; !ok {
return
}
m.workerQueueDepth[name].Inc()
}
func (m *PrometheusMetrics) DequeuedWorkerRequest(name string) {
if _, ok := m.workerQueueDepth[name]; !ok {
return
}
m.workerQueueDepth[name].Dec()
}
func (m *PrometheusMetrics) QueuedRequest() {
m.queueDepth.Inc()
}
func (m *PrometheusMetrics) DequeuedRequest() {
m.queueDepth.Dec()
}
func (m *PrometheusMetrics) Shutdown() {
m.registry.Unregister(m.totalThreads)
m.registry.Unregister(m.busyThreads)
m.registry.Unregister(m.queueDepth)
for _, g := range m.totalWorkers {
m.registry.Unregister(g)
@@ -299,6 +344,10 @@ func (m *PrometheusMetrics) Shutdown() {
m.registry.Unregister(g)
}
for _, g := range m.workerQueueDepth {
m.registry.Unregister(g)
}
m.totalThreads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "frankenphp_total_threads",
Help: "Total number of PHP threads",
@@ -314,6 +363,11 @@ func (m *PrometheusMetrics) Shutdown() {
m.workerRestarts = map[string]prometheus.Counter{}
m.workerCrashes = map[string]prometheus.Counter{}
m.readyWorkers = map[string]prometheus.Gauge{}
m.workerQueueDepth = map[string]prometheus.Gauge{}
m.queueDepth = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "frankenphp_queue_depth",
Help: "Number of regular queued requests",
})
if err := m.registry.Register(m.totalThreads); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
@@ -324,6 +378,11 @@ func (m *PrometheusMetrics) Shutdown() {
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
if err := m.registry.Register(m.queueDepth); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
func getWorkerNameForMetrics(name string) string {
@@ -355,6 +414,11 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics {
workerRestarts: map[string]prometheus.Counter{},
workerCrashes: map[string]prometheus.Counter{},
readyWorkers: map[string]prometheus.Gauge{},
workerQueueDepth: map[string]prometheus.Gauge{},
queueDepth: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "frankenphp_queue_depth",
Help: "Number of regular queued requests",
}),
}
if err := m.registry.Register(m.totalThreads); err != nil &&
@@ -367,5 +431,10 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics {
panic(err)
}
if err := m.registry.Register(m.queueDepth); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
return m
}

View File

@@ -40,6 +40,7 @@ func createPrometheusMetrics() *PrometheusMetrics {
workerRequestCount: make(map[string]prometheus.Counter),
workerCrashes: make(map[string]prometheus.Counter),
workerRestarts: make(map[string]prometheus.Counter),
workerQueueDepth: make(map[string]prometheus.Gauge),
readyWorkers: make(map[string]prometheus.Gauge),
mu: sync.Mutex{},
}

View File

@@ -12,9 +12,11 @@ type Option func(h *opt) error
// If you change this, also update the Caddy module and the documentation.
type opt struct {
numThreads int
maxThreads int
workers []workerOpt
logger *zap.Logger
metrics Metrics
phpIni map[string]string
}
type workerOpt struct {
@@ -33,6 +35,14 @@ func WithNumThreads(numThreads int) Option {
}
}
func WithMaxThreads(maxThreads int) Option {
return func(o *opt) error {
o.maxThreads = maxThreads
return nil
}
}
func WithMetrics(m Metrics) Option {
return func(o *opt) error {
o.metrics = m
@@ -58,3 +68,11 @@ func WithLogger(l *zap.Logger) Option {
return nil
}
}
// WithPhpIni configures user defined PHP ini settings.
func WithPhpIni(overrides map[string]string) Option {
return func(o *opt) error {
o.phpIni = overrides
return nil
}
}

View File

@@ -3,10 +3,13 @@ package frankenphp
// #include "frankenphp.h"
import "C"
import (
"fmt"
"sync"
"github.com/dunglas/frankenphp/internal/memory"
"github.com/dunglas/frankenphp/internal/phpheaders"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// represents the main PHP thread
@@ -15,6 +18,8 @@ type phpMainThread struct {
state *threadState
done chan struct{}
numThreads int
maxThreads int
phpIni map[string]string
commonHeaders map[string]*C.zend_string
knownServerKeys map[string]*C.zend_string
}
@@ -24,63 +29,72 @@ var (
mainThread *phpMainThread
)
// reserve a fixed number of PHP threads on the Go side
func initPHPThreads(numThreads int) error {
// initPHPThreads starts the main PHP thread,
// a fixed number of inactive PHP threads
// and reserves a fixed number of possible PHP threads
func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) (*phpMainThread, error) {
mainThread = &phpMainThread{
state: newThreadState(),
done: make(chan struct{}),
numThreads: numThreads,
maxThreads: numMaxThreads,
phpIni: phpIni,
}
phpThreads = make([]*phpThread, numThreads)
// initialize all threads as inactive
// initialize the first thread
// this needs to happen before starting the main thread
// since some extensions access environment variables on startup
for i := 0; i < numThreads; i++ {
phpThreads[i] = newPHPThread(i)
convertToInactiveThread(phpThreads[i])
}
// the threadIndex on the main thread defaults to 0 -> phpThreads[0].Pin(...)
initialThread := newPHPThread(0)
phpThreads = []*phpThread{initialThread}
if err := mainThread.start(); err != nil {
return err
return nil, err
}
// initialize all other threads
phpThreads = make([]*phpThread, mainThread.maxThreads)
phpThreads[0] = initialThread
for i := 1; i < mainThread.maxThreads; i++ {
phpThreads[i] = newPHPThread(i)
}
// start the underlying C threads
ready := sync.WaitGroup{}
ready.Add(numThreads)
for _, thread := range phpThreads {
for i := 0; i < numThreads; i++ {
thread := phpThreads[i]
go func() {
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex))
}
thread.state.waitFor(stateInactive)
thread.boot()
ready.Done()
}()
}
ready.Wait()
return nil
return mainThread, nil
}
func drainPHPThreads() {
doneWG := sync.WaitGroup{}
doneWG.Add(len(phpThreads))
for _, thread := range phpThreads {
thread.handlerMu.Lock()
_ = thread.state.requestSafeStateChange(stateShuttingDown)
close(thread.drainChan)
}
mainThread.state.set(stateShuttingDown)
close(mainThread.done)
for _, thread := range phpThreads {
// shut down all reserved threads
if thread.state.compareAndSwap(stateReserved, stateDone) {
doneWG.Done()
continue
}
// shut down all active threads
go func(thread *phpThread) {
thread.state.waitFor(stateDone)
thread.handlerMu.Unlock()
thread.shutdown()
doneWG.Done()
}(thread)
}
doneWG.Wait()
mainThread.state.set(stateShuttingDown)
mainThread.state.waitFor(stateDone)
mainThread.state.set(stateDone)
mainThread.state.waitFor(stateReserved)
phpThreads = nil
}
@@ -88,6 +102,7 @@ func (mainThread *phpMainThread) start() error {
if C.frankenphp_new_main_thread(C.int(mainThread.numThreads)) != 0 {
return MainThreadCreationError
}
mainThread.state.waitFor(stateReady)
// cache common request headers as zend_strings (HTTP_ACCEPT, HTTP_USER_AGENT, etc.)
@@ -111,16 +126,74 @@ func getInactivePHPThread() *phpThread {
return thread
}
}
panic("not enough threads reserved")
for _, thread := range phpThreads {
if thread.state.compareAndSwap(stateReserved, stateBootRequested) {
thread.boot()
return thread
}
}
return nil
}
func getPHPThreadAtState(state stateID) *phpThread {
for _, thread := range phpThreads {
if thread.state.is(state) {
return thread
}
}
return nil
}
//export go_frankenphp_main_thread_is_ready
func go_frankenphp_main_thread_is_ready() {
mainThread.setAutomaticMaxThreads()
if mainThread.maxThreads < mainThread.numThreads {
mainThread.maxThreads = mainThread.numThreads
}
mainThread.state.set(stateReady)
mainThread.state.waitFor(stateShuttingDown)
mainThread.state.waitFor(stateDone)
}
// max_threads = auto
// setAutomaticMaxThreads estimates the amount of threads based on php.ini and system memory_limit
// If unable to get the system's memory limit, simply double num_threads
func (mainThread *phpMainThread) setAutomaticMaxThreads() {
if mainThread.maxThreads >= 0 {
return
}
perThreadMemoryLimit := int64(C.frankenphp_get_current_memory_limit())
totalSysMemory := memory.TotalSysMemory()
if perThreadMemoryLimit <= 0 || totalSysMemory == 0 {
mainThread.maxThreads = mainThread.numThreads * 2
return
}
maxAllowedThreads := totalSysMemory / uint64(perThreadMemoryLimit)
mainThread.maxThreads = int(maxAllowedThreads)
if c := logger.Check(zapcore.DebugLevel, "Automatic thread limit"); c != nil {
c.Write(zap.Int("perThreadMemoryLimitMB", int(perThreadMemoryLimit/1024/1024)), zap.Int("maxThreads", mainThread.maxThreads))
}
}
//export go_frankenphp_shutdown_main_thread
func go_frankenphp_shutdown_main_thread() {
mainThread.state.set(stateDone)
mainThread.state.set(stateReserved)
}
//export go_get_custom_php_ini
func go_get_custom_php_ini() *C.char {
if mainThread.phpIni == nil {
return nil
}
// pass the php.ini overrides to PHP before startup
// TODO: if needed this would also be possible on a per-thread basis
overrides := ""
for k, v := range mainThread.phpIni {
overrides += fmt.Sprintf("%s=%s\n", k, v)
}
return C.CString(overrides)
}

View File

@@ -18,8 +18,9 @@ import (
var testDataPath, _ = filepath.Abs("./testdata")
func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {
logger = zap.NewNop() // the logger needs to not be nil
assert.NoError(t, initPHPThreads(1)) // reserve 1 thread
logger = zap.NewNop() // the logger needs to not be nil
_, err := initPHPThreads(1, 1, nil) // boot 1 thread
assert.NoError(t, err)
assert.Len(t, phpThreads, 1)
assert.Equal(t, 0, phpThreads[0].threadIndex)
@@ -31,7 +32,8 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {
func TestTransitionRegularThreadToWorkerThread(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1))
_, err := initPHPThreads(1, 1, nil)
assert.NoError(t, err)
// transition to regular thread
convertToRegularThread(phpThreads[0])
@@ -54,7 +56,8 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) {
func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1))
_, err := initPHPThreads(1, 1, nil)
assert.NoError(t, err)
firstWorker := getDummyWorker("transition-worker-1.php")
secondWorker := getDummyWorker("transition-worker-2.php")
@@ -76,43 +79,39 @@ func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) {
assert.Nil(t, phpThreads)
}
// try all possible handler transitions
// takes around 200ms and is supposed to force race conditions
func TestTransitionThreadsWhileDoingRequests(t *testing.T) {
numThreads := 10
numRequestsPerThread := 100
isRunning := atomic.Bool{}
isRunning.Store(true)
isDone := atomic.Bool{}
wg := sync.WaitGroup{}
worker1Path := testDataPath + "/transition-worker-1.php"
worker2Path := testDataPath + "/transition-worker-2.php"
assert.NoError(t, Init(
WithNumThreads(numThreads),
WithWorkers(worker1Path, 1, map[string]string{"ENV1": "foo"}, []string{}),
WithWorkers(worker2Path, 1, map[string]string{"ENV1": "foo"}, []string{}),
WithWorkers(worker1Path, 1, map[string]string{}, []string{}),
WithWorkers(worker2Path, 1, map[string]string{}, []string{}),
WithLogger(zap.NewNop()),
))
// randomly transition threads between regular, inactive and 2 worker threads
go func() {
for {
for i := 0; i < numThreads; i++ {
switch rand.IntN(4) {
case 0:
convertToRegularThread(phpThreads[i])
case 1:
convertToWorkerThread(phpThreads[i], workers[worker1Path])
case 2:
convertToWorkerThread(phpThreads[i], workers[worker2Path])
case 3:
convertToInactiveThread(phpThreads[i])
}
time.Sleep(time.Millisecond)
if !isRunning.Load() {
return
// try all possible permutations of transition, transition every ms
transitions := allPossibleTransitions(worker1Path, worker2Path)
for i := 0; i < numThreads; i++ {
go func(thread *phpThread, start int) {
for {
for j := start; j < len(transitions); j++ {
if isDone.Load() {
return
}
transitions[j](thread)
time.Sleep(time.Millisecond)
}
start = 0
}
}
}()
}(phpThreads[i], i)
}
// randomly do requests to the 3 endpoints
wg.Add(numThreads)
@@ -132,8 +131,9 @@ func TestTransitionThreadsWhileDoingRequests(t *testing.T) {
}(i)
}
// we are finished as soon as all 1000 requests are done
wg.Wait()
isRunning.Store(false)
isDone.Store(true)
Shutdown()
}
@@ -176,3 +176,20 @@ func assertRequestBody(t *testing.T, url string, expected string) {
body, _ := io.ReadAll(resp.Body)
assert.Equal(t, expected, string(body))
}
// create a mix of possible transitions of workers and regular threads
func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpThread) {
return []func(*phpThread){
convertToRegularThread,
func(thread *phpThread) { thread.shutdown() },
func(thread *phpThread) {
if thread.state.is(stateReserved) {
thread.boot()
}
},
func(thread *phpThread) { convertToWorkerThread(thread, workers[worker1Path]) },
convertToInactiveThread,
func(thread *phpThread) { convertToWorkerThread(thread, workers[worker2Path]) },
convertToInactiveThread,
}
}

View File

@@ -7,23 +7,25 @@ import (
"runtime"
"sync"
"unsafe"
"go.uber.org/zap"
)
// representation of the actual underlying PHP thread
// identified by the index in the phpThreads slice
type phpThread struct {
runtime.Pinner
threadIndex int
requestChan chan *http.Request
drainChan chan struct{}
handlerMu *sync.Mutex
handlerMu sync.Mutex
handler threadHandler
state *threadState
}
// interface that defines how the callbacks from the C thread should be handled
type threadHandler interface {
name() string
beforeScriptExecution() string
afterScriptExecution(exitStatus int)
getActiveRequest() *http.Request
@@ -32,21 +34,55 @@ type threadHandler interface {
func newPHPThread(threadIndex int) *phpThread {
return &phpThread{
threadIndex: threadIndex,
drainChan: make(chan struct{}),
requestChan: make(chan *http.Request),
handlerMu: &sync.Mutex{},
state: newThreadState(),
}
}
// boot starts the underlying PHP thread
func (thread *phpThread) boot() {
// thread must be in reserved state to boot
if !thread.state.compareAndSwap(stateReserved, stateBooting) && !thread.state.compareAndSwap(stateBootRequested, stateBooting) {
logger.Panic("thread is not in reserved state: " + thread.state.name())
return
}
// boot threads as inactive
thread.handlerMu.Lock()
thread.handler = &inactiveThread{thread: thread}
thread.drainChan = make(chan struct{})
thread.handlerMu.Unlock()
// start the actual posix thread - TODO: try this with go threads instead
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex))
}
thread.state.waitFor(stateInactive)
}
// shutdown the underlying PHP thread
func (thread *phpThread) shutdown() {
if !thread.state.requestSafeStateChange(stateShuttingDown) {
// already shutting down or done
return
}
close(thread.drainChan)
thread.state.waitFor(stateDone)
thread.drainChan = make(chan struct{})
// threads go back to the reserved state from which they can be booted again
if mainThread.state.is(stateReady) {
thread.state.set(stateReserved)
}
}
// change the thread handler safely
// must be called from outside the PHP thread
func (thread *phpThread) setHandler(handler threadHandler) {
logger.Debug("setHandler")
thread.handlerMu.Lock()
defer thread.handlerMu.Unlock()
if !thread.state.requestSafeStateChange(stateTransitionRequested) {
// no state change allowed == shutdown
// no state change allowed == shutdown or done
return
}
close(thread.drainChan)
@@ -95,6 +131,7 @@ func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char {
if scriptName == "" {
return nil
}
// return the name of the PHP script that should be executed
return thread.pinCString(scriptName)
}

258
scaling.go Normal file
View File

@@ -0,0 +1,258 @@
package frankenphp
//#include "frankenphp.h"
//#include <sys/resource.h>
import "C"
import (
"errors"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/cpu"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const (
// requests have to be stalled for at least this amount of time before scaling
minStallTime = 5 * time.Millisecond
// time to check for CPU usage before scaling a single thread
cpuProbeTime = 120 * time.Millisecond
// do not scale over this amount of CPU usage
maxCpuUsageForScaling = 0.8
// upscale stalled threads every x milliseconds
upscaleCheckTime = 100 * time.Millisecond
// downscale idle threads every x seconds
downScaleCheckTime = 5 * time.Second
// max amount of threads stopped in one iteration of downScaleCheckTime
maxTerminationCount = 10
// autoscaled threads waiting for longer than this time are downscaled
maxThreadIdleTime = 5 * time.Second
)
var (
scaleChan chan *FrankenPHPContext
autoScaledThreads = []*phpThread{}
scalingMu = new(sync.RWMutex)
MaxThreadsReachedError = errors.New("max amount of overall threads reached")
CannotRemoveLastThreadError = errors.New("cannot remove last thread")
WorkerNotFoundError = errors.New("worker not found for given filename")
)
func initAutoScaling(mainThread *phpMainThread) {
if mainThread.maxThreads <= mainThread.numThreads {
scaleChan = nil
return
}
scalingMu.Lock()
scaleChan = make(chan *FrankenPHPContext)
maxScaledThreads := mainThread.maxThreads - mainThread.numThreads
autoScaledThreads = make([]*phpThread, 0, maxScaledThreads)
scalingMu.Unlock()
go startUpscalingThreads(maxScaledThreads, scaleChan, mainThread.done)
go startDownScalingThreads(mainThread.done)
}
func drainAutoScaling() {
scalingMu.Lock()
if c := logger.Check(zapcore.DebugLevel, "shutting down autoscaling"); c != nil {
c.Write(zap.Int("autoScaledThreads", len(autoScaledThreads)))
}
scalingMu.Unlock()
}
func addRegularThread() (*phpThread, error) {
thread := getInactivePHPThread()
if thread == nil {
return nil, MaxThreadsReachedError
}
convertToRegularThread(thread)
thread.state.waitFor(stateReady, stateShuttingDown, stateReserved)
return thread, nil
}
func removeRegularThread() error {
regularThreadMu.RLock()
if len(regularThreads) <= 1 {
regularThreadMu.RUnlock()
return CannotRemoveLastThreadError
}
thread := regularThreads[len(regularThreads)-1]
regularThreadMu.RUnlock()
thread.shutdown()
return nil
}
func addWorkerThread(worker *worker) (*phpThread, error) {
thread := getInactivePHPThread()
if thread == nil {
return nil, MaxThreadsReachedError
}
convertToWorkerThread(thread, worker)
thread.state.waitFor(stateReady, stateShuttingDown, stateReserved)
return thread, nil
}
func removeWorkerThread(worker *worker) error {
worker.threadMutex.RLock()
if len(worker.threads) <= 1 {
worker.threadMutex.RUnlock()
return CannotRemoveLastThreadError
}
thread := worker.threads[len(worker.threads)-1]
worker.threadMutex.RUnlock()
thread.shutdown()
return nil
}
// scaleWorkerThread adds a worker PHP thread automatically
func scaleWorkerThread(worker *worker) {
scalingMu.Lock()
defer scalingMu.Unlock()
if !mainThread.state.is(stateReady) {
return
}
// probe CPU usage before scaling
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
return
}
thread, err := addWorkerThread(worker)
if err != nil {
if c := logger.Check(zapcore.WarnLevel, "could not increase max_threads, consider raising this limit"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Error(err))
}
return
}
autoScaledThreads = append(autoScaledThreads, thread)
}
// scaleRegularThread adds a regular PHP thread automatically
func scaleRegularThread() {
scalingMu.Lock()
defer scalingMu.Unlock()
if !mainThread.state.is(stateReady) {
return
}
// probe CPU usage before scaling
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
return
}
thread, err := addRegularThread()
if err != nil {
if c := logger.Check(zapcore.WarnLevel, "could not increase max_threads, consider raising this limit"); c != nil {
c.Write(zap.Error(err))
}
return
}
autoScaledThreads = append(autoScaledThreads, thread)
}
func startUpscalingThreads(maxScaledThreads int, scale chan *FrankenPHPContext, done chan struct{}) {
for {
scalingMu.Lock()
scaledThreadCount := len(autoScaledThreads)
scalingMu.Unlock()
if scaledThreadCount >= maxScaledThreads {
// we have reached max_threads, check again later
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
continue
}
}
select {
case fc := <-scale:
timeSinceStalled := time.Since(fc.startedAt)
// if the request has not been stalled long enough, wait and repeat
if timeSinceStalled < minStallTime {
select {
case <-done:
return
case <-time.After(minStallTime - timeSinceStalled):
continue
}
}
// if the request has been stalled long enough, scale
if worker, ok := workers[fc.scriptFilename]; ok {
scaleWorkerThread(worker)
} else {
scaleRegularThread()
}
case <-done:
return
}
}
}
func startDownScalingThreads(done chan struct{}) {
for {
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
deactivateThreads()
}
}
}
// deactivateThreads checks all threads and removes those that have been inactive for too long
func deactivateThreads() {
stoppedThreadCount := 0
scalingMu.Lock()
defer scalingMu.Unlock()
for i := len(autoScaledThreads) - 1; i >= 0; i-- {
thread := autoScaledThreads[i]
// the thread might have been stopped otherwise, remove it
if thread.state.is(stateReserved) {
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
continue
}
waitTime := thread.state.waitTime()
if stoppedThreadCount > maxTerminationCount || waitTime == 0 {
continue
}
// convert threads to inactive if they have been idle for too long
if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() {
if c := logger.Check(zapcore.DebugLevel, "auto-converting thread to inactive"); c != nil {
c.Write(zap.Int("threadIndex", thread.threadIndex))
}
convertToInactiveThread(thread)
stoppedThreadCount++
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
continue
}
// TODO: Completely stopping threads is more memory efficient
// Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory)
// Reactivate this if there is a better solution or workaround
//if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() {
// if c := logger.Check(zapcore.DebugLevel, "auto-stopping thread"); c != nil {
// c.Write(zap.Int("threadIndex", thread.threadIndex))
// }
// thread.shutdown()
// stoppedThreadCount++
// autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
// continue
//}
}
}

60
scaling_test.go Normal file
View File

@@ -0,0 +1,60 @@
package frankenphp
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestScaleARegularThreadUpAndDown(t *testing.T) {
assert.NoError(t, Init(
WithNumThreads(1),
WithMaxThreads(2),
WithLogger(zap.NewNop()),
))
autoScaledThread := phpThreads[1]
// scale up
scaleRegularThread()
assert.Equal(t, stateReady, autoScaledThread.state.get())
assert.IsType(t, &regularThread{}, autoScaledThread.handler)
// on down-scale, the thread will be marked as inactive
setLongWaitTime(autoScaledThread)
deactivateThreads()
assert.IsType(t, &inactiveThread{}, autoScaledThread.handler)
Shutdown()
}
func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
workerPath := testDataPath + "/transition-worker-1.php"
assert.NoError(t, Init(
WithNumThreads(2),
WithMaxThreads(3),
WithWorkers(workerPath, 1, map[string]string{}, []string{}),
WithLogger(zap.NewNop()),
))
autoScaledThread := phpThreads[2]
// scale up
scaleWorkerThread(workers[workerPath])
assert.Equal(t, stateReady, autoScaledThread.state.get())
// on down-scale, the thread will be marked as inactive
setLongWaitTime(autoScaledThread)
deactivateThreads()
assert.IsType(t, &inactiveThread{}, autoScaledThread.handler)
Shutdown()
}
func setLongWaitTime(thread *phpThread) {
thread.state.mu.Lock()
thread.state.waitingSince = time.Now().Add(-time.Hour)
thread.state.mu.Unlock()
}

View File

@@ -2,19 +2,21 @@ package frankenphp
import (
"slices"
"strconv"
"sync"
"time"
)
type stateID uint8
const (
// lifecycle states of a thread
stateBooting stateID = iota
// livecycle states of a thread
stateReserved stateID = iota
stateBooting
stateBootRequested
stateShuttingDown
stateDone
// these states are safe to transition from at any time
// these states are 'stable' and safe to transition from at any time
stateInactive
stateReady
@@ -28,10 +30,27 @@ const (
stateTransitionComplete
)
var stateNames = map[stateID]string{
stateReserved: "reserved",
stateBooting: "booting",
stateInactive: "inactive",
stateReady: "ready",
stateShuttingDown: "shutting down",
stateDone: "done",
stateRestarting: "restarting",
stateYielding: "yielding",
stateTransitionRequested: "transition requested",
stateTransitionInProgress: "transition in progress",
stateTransitionComplete: "transition complete",
}
type threadState struct {
currentState stateID
mu sync.RWMutex
subscribers []stateSubscriber
// how long threads have been waiting in stable states
waitingSince time.Time
isWaiting bool
}
type stateSubscriber struct {
@@ -41,7 +60,7 @@ type stateSubscriber struct {
func newThreadState() *threadState {
return &threadState{
currentState: stateBooting,
currentState: stateReserved,
subscribers: []stateSubscriber{},
mu: sync.RWMutex{},
}
@@ -68,8 +87,7 @@ func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool {
}
func (ts *threadState) name() string {
// TODO: return the actual name for logging/metrics
return "state:" + strconv.Itoa(int(ts.get()))
return stateNames[ts.get()]
}
func (ts *threadState) get() stateID {
@@ -123,8 +141,8 @@ func (ts *threadState) waitFor(states ...stateID) {
func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
ts.mu.Lock()
switch ts.currentState {
// disallow state changes if shutting down
case stateShuttingDown, stateDone:
// disallow state changes if shutting down or done
case stateShuttingDown, stateDone, stateReserved:
ts.mu.Unlock()
return false
// ready and inactive are safe states to transition from
@@ -140,3 +158,34 @@ func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
ts.waitFor(stateReady, stateInactive, stateShuttingDown)
return ts.requestSafeStateChange(nextState)
}
// markAsWaiting hints that the thread reached a stable state and is waiting for requests or shutdown
func (ts *threadState) markAsWaiting(isWaiting bool) {
ts.mu.Lock()
if isWaiting {
ts.isWaiting = true
ts.waitingSince = time.Now()
} else {
ts.isWaiting = false
}
ts.mu.Unlock()
}
// isWaitingState returns true if a thread is waiting for a request or shutdown
func (ts *threadState) isInWaitingState() bool {
ts.mu.RLock()
isWaiting := ts.isWaiting
ts.mu.RUnlock()
return isWaiting
}
// waitTime returns the time since the thread is waiting in a stable state in ms
func (ts *threadState) waitTime() int64 {
ts.mu.RLock()
waitTime := int64(0)
if ts.isWaiting {
waitTime = time.Now().UnixMilli() - ts.waitingSince.UnixMilli()
}
ts.mu.RUnlock()
return waitTime
}

7
testdata/ini.php vendored Normal file
View File

@@ -0,0 +1,7 @@
<?php
require_once __DIR__.'/_executor.php';
return function () {
echo $_GET['key'] . ':' . ini_get($_GET['key']);
};

29
testdata/performance/api.js vendored Normal file
View File

@@ -0,0 +1,29 @@
import http from 'k6/http'
/**
* Many applications communicate with external APIs or microservices.
* Latencies tend to be much higher than with databases in these cases.
* We'll consider 10ms-150ms
*/
export const options = {
stages: [
{ duration: '20s', target: 150 },
{ duration: '20s', target: 1000 },
{ duration: '10s', target: 0 }
],
thresholds: {
http_req_failed: ['rate<0.01']
}
}
/* global __ENV */
export default function () {
// 10-150ms latency
const latency = Math.floor(Math.random() * 141) + 10
// 1-30000 work units
const work = Math.ceil(Math.random() * 30000)
// 1-40 output units
const output = Math.ceil(Math.random() * 40)
http.get(http.url`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=${latency}&work=${work}&output=${output}`)
}

27
testdata/performance/computation.js vendored Normal file
View File

@@ -0,0 +1,27 @@
import http from 'k6/http'
/**
* Simulate an application that does very little IO, but a lot of computation
*/
export const options = {
stages: [
{ duration: '20s', target: 80 },
{ duration: '20s', target: 150 },
{ duration: '5s', target: 0 }
],
thresholds: {
http_req_failed: ['rate<0.01']
}
}
/* global __ENV */
export default function () {
// do 1-1,000,000 work units
const work = Math.ceil(Math.random() * 1_000_000)
// output 1-500 units
const output = Math.ceil(Math.random() * 500)
// simulate 0-2ms latency
const latency = Math.floor(Math.random() * 3)
http.get(http.url`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=${latency}&work=${work}&output=${output}`)
}

30
testdata/performance/database.js vendored Normal file
View File

@@ -0,0 +1,30 @@
import http from 'k6/http'
/**
* Modern databases tend to have latencies in the single-digit milliseconds.
* We'll simulate 1-10ms latencies and 1-2 queries per request.
*/
export const options = {
stages: [
{ duration: '20s', target: 100 },
{ duration: '30s', target: 200 },
{ duration: '10s', target: 0 }
],
thresholds: {
http_req_failed: ['rate<0.01']
}
}
/* global __ENV */
export default function () {
// 1-10ms latency
const latency = Math.floor(Math.random() * 10) + 1
// 1-2 iterations per request
const iterations = Math.floor(Math.random() * 2) + 1
// 1-30000 work units per iteration
const work = Math.ceil(Math.random() * 30000)
// 1-40 output units
const output = Math.ceil(Math.random() * 40)
http.get(http.url`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=${latency}&work=${work}&output=${output}&iterations=${iterations}`)
}

16
testdata/performance/flamegraph.sh vendored Executable file
View File

@@ -0,0 +1,16 @@
#!/bin/bash
# install brendangregg's FlameGraph
if [ ! -d "/usr/local/src/flamegraph" ]; then
mkdir /usr/local/src/flamegraph &&
cd /usr/local/src/flamegraph &&
git clone https://github.com/brendangregg/FlameGraph.git
fi
# let the test warm up
sleep 10
# run a 30 second profile on the Caddy admin port
cd /usr/local/src/flamegraph/FlameGraph &&
go tool pprof -raw -output=cpu.txt 'http://localhost:2019/debug/pprof/profile?seconds=30' &&
./stackcollapse-go.pl cpu.txt | ./flamegraph.pl >/go/src/app/testdata/performance/flamegraph.svg

View File

@@ -0,0 +1,28 @@
import http from 'k6/http'
/**
* It is not uncommon for external services to hang for a long time.
* Make sure the server is resilient in such cases and doesn't hang as well.
*/
export const options = {
stages: [
{ duration: '20s', target: 100 },
{ duration: '20s', target: 500 },
{ duration: '20s', target: 0 }
],
thresholds: {
http_req_failed: ['rate<0.01']
}
}
/* global __ENV */
export default function () {
// 2% chance for a request that hangs for 15s
if (Math.random() < 0.02) {
http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=15000&work=10000&output=100`)
return
}
// a regular request
http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=5&work=10000&output=100`)
}

20
testdata/performance/hello-world.js vendored Normal file
View File

@@ -0,0 +1,20 @@
import http from 'k6/http'
/**
* 'Hello world' tests the raw server performance.
*/
export const options = {
stages: [
{ duration: '5s', target: 100 },
{ duration: '20s', target: 400 },
{ duration: '5s', target: 0 }
],
thresholds: {
http_req_failed: ['rate<0.01']
}
}
/* global __ENV */
export default function () {
http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php`)
}

20
testdata/performance/k6.Caddyfile vendored Normal file
View File

@@ -0,0 +1,20 @@
{
frankenphp {
max_threads {$MAX_THREADS}
num_threads {$NUM_THREADS}
worker {
file /go/src/app/testdata/{$WORKER_FILE:sleep.php}
num {$WORKER_THREADS}
}
}
}
:80 {
route {
root /go/src/app/testdata
php {
root /go/src/app/testdata
enable_root_symlink false
}
}
}

39
testdata/performance/perf-test.sh vendored Executable file
View File

@@ -0,0 +1,39 @@
#!/bin/bash
# install the dev.Dockerfile, build the app and run k6 tests
docker build -t frankenphp-dev -f dev.Dockerfile .
export "CADDY_HOSTNAME=http://host.docker.internal"
select filename in ./testdata/performance/*.js; do
read -r -p "How many worker threads? " workerThreads
read -r -p "How many max threads? " maxThreads
numThreads=$((workerThreads + 1))
docker run --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
-p 8125:80 \
-v "$PWD:/go/src/app" \
--name load-test-container \
-e "MAX_THREADS=$maxThreads" \
-e "WORKER_THREADS=$workerThreads" \
-e "NUM_THREADS=$numThreads" \
-itd \
frankenphp-dev \
sh /go/src/app/testdata/performance/start-server.sh
docker exec -d load-test-container sh /go/src/app/testdata/performance/flamegraph.sh
sleep 10
docker run --entrypoint "" -it -v .:/app -w /app \
--add-host "host.docker.internal:host-gateway" \
grafana/k6:latest \
k6 run -e "CADDY_HOSTNAME=$CADDY_HOSTNAME:8125" "./$filename"
docker exec load-test-container curl "http://localhost:2019/frankenphp/threads"
docker stop load-test-container
docker rm load-test-container
done

View File

@@ -0,0 +1,19 @@
# Running Load tests
To run load tests with k6 you need to have Docker and Bash installed.
Go the root of this repository and run:
```sh
bash testdata/performance/perf-test.sh
```
This will build the `frankenphp-dev` docker image and run it under the name 'load-test-container'
in the background. Additionally, it will run the `grafana/k6` container and you'll be able to choose
the load test you want to run. A `flamegraph.svg` will be created in the `testdata/performance` directory.
If the load test has stopped prematurely, you might have to remove the container manually:
```sh
docker stop load-test-container
docker rm load-test-container
```

7
testdata/performance/start-server.sh vendored Executable file
View File

@@ -0,0 +1,7 @@
#!/bin/bash
# build and run FrankenPHP with the k6.Caddyfile
cd /go/src/app/caddy/frankenphp &&
go build --buildvcs=false &&
cd ../../testdata/performance &&
/go/src/app/caddy/frankenphp/frankenphp run -c k6.Caddyfile

32
testdata/performance/timeouts.js vendored Normal file
View File

@@ -0,0 +1,32 @@
import http from 'k6/http'
/**
* Databases or external resources can sometimes become unavailable for short periods of time.
* Make sure the server can recover quickly from periods of unavailability.
* This simulation swaps between a hanging and a working server every 10 seconds.
*/
export const options = {
stages: [
{ duration: '20s', target: 100 },
{ duration: '20s', target: 500 },
{ duration: '20s', target: 0 }
],
thresholds: {
http_req_failed: ['rate<0.01']
}
}
/* global __ENV */
export default function () {
const tenSecondInterval = Math.floor(new Date().getSeconds() / 10)
const shouldHang = tenSecondInterval % 2 === 0
// every 10 seconds requests lead to a max_execution-timeout
if (shouldHang) {
http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=50000`)
return
}
// every other 10 seconds the resource is back
http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=5&work=30000&output=100`)
}

29
testdata/sleep.php vendored Normal file
View File

@@ -0,0 +1,29 @@
<?php
require_once __DIR__ . '/_executor.php';
return function () {
$sleep = (int)($_GET['sleep'] ?? 0);
$work = (int)($_GET['work'] ?? 0);
$output = (int)($_GET['output'] ?? 1);
$iterations = (int)($_GET['iterations'] ?? 1);
for ($i = 0; $i < $iterations; $i++) {
// simulate work
// with 30_000 iterations we're in the range of a simple Laravel request
// (without JIT and with debug symbols enabled)
for ($j = 0; $j < $work; $j++) {
$a = +$j;
}
// simulate IO, sleep x milliseconds
if ($sleep > 0) {
usleep($sleep * 1000);
}
// simulate output
for ($k = 0; $k < $output; $k++) {
echo "slept for $sleep ms and worked for $work iterations";
}
}
};

View File

@@ -1,78 +0,0 @@
package frankenphp
// #include "frankenphp.h"
import "C"
import (
"net/http"
)
// representation of a non-worker PHP thread
// executes PHP scripts in a web context
// implements the threadHandler interface
type regularThread struct {
state *threadState
thread *phpThread
activeRequest *http.Request
}
func convertToRegularThread(thread *phpThread) {
thread.setHandler(&regularThread{
thread: thread,
state: thread.state,
})
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *regularThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
return handler.thread.transitionToNewHandler()
case stateTransitionComplete:
handler.state.set(stateReady)
return handler.waitForRequest()
case stateReady:
return handler.waitForRequest()
case stateShuttingDown:
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
// return true if the worker should continue to run
func (handler *regularThread) afterScriptExecution(exitStatus int) {
handler.afterRequest(exitStatus)
}
func (handler *regularThread) getActiveRequest() *http.Request {
return handler.activeRequest
}
func (handler *regularThread) waitForRequest() string {
select {
case <-handler.thread.drainChan:
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case r := <-requestChan:
handler.activeRequest = r
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if err := updateServerContext(handler.thread, r, true, false); err != nil {
rejectRequest(fc.responseWriter, err.Error())
handler.afterRequest(0)
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
}
// set the scriptFilename that should be executed
return fc.scriptFilename
}
}
func (handler *regularThread) afterRequest(exitStatus int) {
fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
maybeCloseContext(fc)
handler.activeRequest = nil
}

View File

@@ -6,15 +6,13 @@ import (
// representation of a thread with no work assigned to it
// implements the threadHandler interface
// each inactive thread weighs around ~350KB
// keeping threads at 'inactive' will consume more memory, but allow a faster transition
type inactiveThread struct {
thread *phpThread
}
func convertToInactiveThread(thread *phpThread) {
if thread.handler == nil {
thread.handler = &inactiveThread{thread: thread}
return
}
thread.setHandler(&inactiveThread{thread: thread})
}
@@ -26,8 +24,11 @@ func (handler *inactiveThread) beforeScriptExecution() string {
return thread.transitionToNewHandler()
case stateBooting, stateTransitionComplete:
thread.state.set(stateInactive)
// wait for external signal to start or shut down
thread.state.markAsWaiting(true)
thread.state.waitFor(stateTransitionRequested, stateShuttingDown)
thread.state.markAsWaiting(false)
return handler.beforeScriptExecution()
case stateShuttingDown:
// signal to stop
@@ -36,10 +37,14 @@ func (handler *inactiveThread) beforeScriptExecution() string {
panic("unexpected state: " + thread.state.name())
}
func (thread *inactiveThread) afterScriptExecution(exitStatus int) {
func (handler *inactiveThread) afterScriptExecution(exitStatus int) {
panic("inactive threads should not execute scripts")
}
func (thread *inactiveThread) getActiveRequest() *http.Request {
panic("inactive threads have no requests")
func (handler *inactiveThread) getActiveRequest() *http.Request {
return nil
}
func (handler *inactiveThread) name() string {
return "Inactive PHP Thread"
}

139
threadregular.go Normal file
View File

@@ -0,0 +1,139 @@
package frankenphp
import (
"net/http"
"sync"
)
// representation of a non-worker PHP thread
// executes PHP scripts in a web context
// implements the threadHandler interface
type regularThread struct {
state *threadState
thread *phpThread
activeRequest *http.Request
}
var (
regularThreads []*phpThread
regularThreadMu = &sync.RWMutex{}
regularRequestChan chan *http.Request
)
func convertToRegularThread(thread *phpThread) {
thread.setHandler(&regularThread{
thread: thread,
state: thread.state,
})
attachRegularThread(thread)
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *regularThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
detachRegularThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateTransitionComplete:
handler.state.set(stateReady)
return handler.waitForRequest()
case stateReady:
return handler.waitForRequest()
case stateShuttingDown:
detachRegularThread(handler.thread)
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
// return true if the worker should continue to run
func (handler *regularThread) afterScriptExecution(exitStatus int) {
handler.afterRequest(exitStatus)
}
func (handler *regularThread) getActiveRequest() *http.Request {
return handler.activeRequest
}
func (handler *regularThread) name() string {
return "Regular PHP Thread"
}
func (handler *regularThread) waitForRequest() string {
handler.state.markAsWaiting(true)
var r *http.Request
select {
case <-handler.thread.drainChan:
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case r = <-regularRequestChan:
}
handler.activeRequest = r
handler.state.markAsWaiting(false)
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if err := updateServerContext(handler.thread, r, true, false); err != nil {
rejectRequest(fc.responseWriter, err.Error())
handler.afterRequest(0)
handler.thread.Unpin()
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
}
// set the scriptFilename that should be executed
return fc.scriptFilename
}
func (handler *regularThread) afterRequest(exitStatus int) {
fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
maybeCloseContext(fc)
handler.activeRequest = nil
}
func handleRequestWithRegularPHPThreads(r *http.Request, fc *FrankenPHPContext) {
metrics.StartRequest()
select {
case regularRequestChan <- r:
// a thread was available to handle the request immediately
<-fc.done
metrics.StopRequest()
return
default:
// no thread was available
}
// if no thread was available, mark the request as queued and fan it out to all threads
metrics.QueuedRequest()
for {
select {
case regularRequestChan <- r:
metrics.DequeuedRequest()
<-fc.done
metrics.StopRequest()
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
}
}
}
func attachRegularThread(thread *phpThread) {
regularThreadMu.Lock()
regularThreads = append(regularThreads, thread)
regularThreadMu.Unlock()
}
func detachRegularThread(thread *phpThread) {
regularThreadMu.Lock()
for i, t := range regularThreads {
if t == thread {
regularThreads = append(regularThreads[:i], regularThreads[i+1:]...)
break
}
}
regularThreadMu.Unlock()
}

View File

@@ -52,6 +52,7 @@ func (handler *workerThread) beforeScriptExecution() string {
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
handler.worker.detachThread(handler.thread)
// signal to stop
return ""
}
@@ -70,6 +71,10 @@ func (handler *workerThread) getActiveRequest() *http.Request {
return handler.fakeRequest
}
func (handler *workerThread) name() string {
return "Worker PHP Thread - " + handler.worker.fileName
}
func setupWorkerScript(handler *workerThread, worker *worker) {
handler.backoff.wait()
metrics.StartWorker(worker.fileName)
@@ -110,10 +115,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
defer func() {
handler.fakeRequest = nil
}()
handler.fakeRequest = nil
// on exit status 0 we just run the worker script again
worker := handler.worker
@@ -152,6 +154,8 @@ func (handler *workerThread) waitForWorkerRequest() bool {
metrics.ReadyWorker(handler.worker.fileName)
}
handler.state.markAsWaiting(true)
var r *http.Request
select {
case <-handler.thread.drainChan:
@@ -159,8 +163,9 @@ func (handler *workerThread) waitForWorkerRequest() bool {
c.Write(zap.String("worker", handler.worker.fileName))
}
// execute opcache_reset if the restart was triggered by the watcher
if watcherIsEnabled && handler.state.is(stateRestarting) {
// flush the opcache when restarting due to watcher or admin api
// note: this is done right before frankenphp_handle_request() returns 'false'
if handler.state.is(stateRestarting) {
C.frankenphp_reset_opcache()
}
@@ -170,6 +175,7 @@ func (handler *workerThread) waitForWorkerRequest() bool {
}
handler.workerRequest = r
handler.state.markAsWaiting(false)
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
@@ -210,7 +216,6 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
maybeCloseContext(fc)
thread.handler.(*workerThread).workerRequest = nil
thread.handler.(*workerThread).inRequest = false
if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))

View File

@@ -28,7 +28,7 @@ func TestWorkersShouldReloadOnMatchingPattern(t *testing.T) {
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) {
requestBodyHasReset := pollForWorkerReset(t, handler, maxTimesToPollForChanges)
assert.True(t, requestBodyHasReset)
}, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-watcher.php", watch: watch})
}, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-counter.php", watch: watch})
}
func TestWorkersShouldNotReloadOnExcludingPattern(t *testing.T) {
@@ -37,19 +37,19 @@ func TestWorkersShouldNotReloadOnExcludingPattern(t *testing.T) {
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) {
requestBodyHasReset := pollForWorkerReset(t, handler, minTimesToPollForChanges)
assert.False(t, requestBodyHasReset)
}, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-watcher.php", watch: watch})
}, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-counter.php", watch: watch})
}
func pollForWorkerReset(t *testing.T, handler func(http.ResponseWriter, *http.Request), limit int) bool {
// first we make an initial request to start the request counter
body := fetchBody("GET", "http://example.com/worker-with-watcher.php", handler)
body := fetchBody("GET", "http://example.com/worker-with-counter.php", handler)
assert.Equal(t, "requests:1", body)
// now we spam file updates and check if the request counter resets
for i := 0; i < limit; i++ {
updateTestFile("./testdata/files/test.txt", "updated", t)
time.Sleep(pollingTime * time.Millisecond)
body := fetchBody("GET", "http://example.com/worker-with-watcher.php", handler)
body := fetchBody("GET", "http://example.com/worker-with-counter.php", handler)
if body == "requests:1" {
return true
}

View File

@@ -56,7 +56,8 @@ func initWorkers(opt []workerOpt) error {
return nil
}
if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
watcherIsEnabled = true
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, getLogger()); err != nil {
return err
}
@@ -89,7 +90,12 @@ func drainWorkers() {
watcher.DrainWatcher()
}
func restartWorkers() {
// RestartWorkers attempts to restart all workers gracefully
func RestartWorkers() {
// disallow scaling threads while restarting workers
scalingMu.Lock()
defer scalingMu.Unlock()
ready := sync.WaitGroup{}
threadsToRestart := make([]*phpThread, 0)
for _, worker := range workers {
@@ -97,7 +103,8 @@ func restartWorkers() {
ready.Add(len(worker.threads))
for _, thread := range worker.threads {
if !thread.state.requestSafeStateChange(stateRestarting) {
// no state change allowed = shutdown
// no state change allowed == thread is shutting down
// we'll proceed to restart all other threads anyways
continue
}
close(thread.drainChan)
@@ -143,6 +150,14 @@ func (worker *worker) detachThread(thread *phpThread) {
worker.threadMutex.Unlock()
}
func (worker *worker) countThreads() int {
worker.threadMutex.RLock()
l := len(worker.threads)
worker.threadMutex.RUnlock()
return l
}
func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
metrics.StartWorkerRequest(fc.scriptFilename)
@@ -156,13 +171,22 @@ func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
return
default:
// thread is busy, continue
}
}
worker.threadMutex.RUnlock()
// if no thread was available, fan the request out to all threads
// TODO: theoretically there could be autoscaling of threads here
worker.requestChan <- r
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
// if no thread was available, mark the request as queued and apply the scaling strategy
metrics.QueuedWorkerRequest(fc.scriptFilename)
for {
select {
case worker.requestChan <- r:
metrics.DequeuedWorkerRequest(fc.scriptFilename)
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
}
}
}