Files
frankenphp/scaling.go
Marc 1d74b2caa8 feat: define domain specific workers in php_server and php blocks (#1509)
* add module (php_server directive) based workers

* refactor moduleID to uintptr for faster comparisons

* let workers inherit environment variables and root from php_server

* caddy can shift FrankenPHPModules in memory for some godforsaken reason, can't rely on them staying the same

* remove debugging statement

* fix tests

* refactor moduleID to uint64 for faster comparisons

* actually allow multiple workers per script filename

* remove logging

* utility function

* reuse existing worker with same filename and environment when calling newWorker with a filepath that already has a suitable worker, simply add number of threads

* no cleanup happens between tests, so restore old global worker overwriting logic

* add test, use getWorker(ForContext) function in frankenphp.go as well

* bring error on second global worker with the same filename again

* refactor to using name instead of moduleID

* nicer name

* nicer name

* add more tests

* remove test case already covered by previous test

* revert back to single variable, moduleIDs no longer relevant

* update comment

* figure out the worker to use in FrankenPHPModule::ServeHTTP

* add caddy/config_tests, add --retry 5 to download

* add caddy/config_tests

* sum up logic a bit, put worker thread addition into moduleWorkers parsing, before workers are actually created

* implement suggestions as far as possible

* fixup

* remove tags

* feat: download the mostly static binary when possible (#1467)

* feat: download the mostly static binary when possible

* cs

* docs: remove wildcard matcher from root directive (#1513)

* docs: update README with additional documentation links

Add link to classic mode, efficiently serving large static files and monitoring FrankenPHP

Signed-off-by: Romain Bastide <romain.bastide@orange.com>

* ci: combine dependabot updates for one group to 1 pull-request

* feat: compatibility with libphp.dylib on macOS

* feat: upgrade to Caddy 2.10

* feat: upgrade to Caddy 2.10

* chore: run prettier

* fix: build-static.sh consecutive builds (#1496)

* fix consecutive builds

* use minor version in PHP_VERSION

* install jq in centos container

* fix "arm64" download arch for spc binary

* jq is not available as a rpm download

* linter

* specify php 8.4 default

specify 8.4 so we manually switch to 8.5 when we make sure it works
allows to run without jq installed

* Apply suggestions from code review

Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>

---------

Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>

* chore: update Go and toolchain version (#1526)

* apply suggestions one be one - scriptpath only

* generate unique worker names by filename and number

* support worker config from embedded apps

* rename back to make sure we don't accidentally add FrankenPHPApp workers to the slice

* fix test after changing error message

* use 🧩 for module workers

* use 🌍 for global workers :)

* revert 1c414cebbc

* revert 4cc8893ced

* apply suggestions

* add dynamic config loading test of module worker

* fix test

* minor changes

---------

Signed-off-by: Romain Bastide <romain.bastide@orange.com>
Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
Co-authored-by: Indra Gunawan <hello@indra.my.id>
Co-authored-by: Romain Bastide <romain.bastide@orange.com>
2025-05-05 16:14:19 +02:00

220 lines
6.0 KiB
Go

package frankenphp
//#include "frankenphp.h"
//#include <sys/resource.h>
import "C"
import (
"context"
"errors"
"log/slog"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/cpu"
)
const (
// requests have to be stalled for at least this amount of time before scaling
minStallTime = 5 * time.Millisecond
// time to check for CPU usage before scaling a single thread
cpuProbeTime = 120 * time.Millisecond
// do not scale over this amount of CPU usage
maxCpuUsageForScaling = 0.8
// downscale idle threads every x seconds
downScaleCheckTime = 5 * time.Second
// max amount of threads stopped in one iteration of downScaleCheckTime
maxTerminationCount = 10
// autoscaled threads waiting for longer than this time are downscaled
maxThreadIdleTime = 5 * time.Second
)
var (
ErrMaxThreadsReached = errors.New("max amount of overall threads reached")
scaleChan chan *frankenPHPContext
autoScaledThreads = []*phpThread{}
scalingMu = new(sync.RWMutex)
)
func initAutoScaling(mainThread *phpMainThread) {
if mainThread.maxThreads <= mainThread.numThreads {
scaleChan = nil
return
}
scalingMu.Lock()
scaleChan = make(chan *frankenPHPContext)
maxScaledThreads := mainThread.maxThreads - mainThread.numThreads
autoScaledThreads = make([]*phpThread, 0, maxScaledThreads)
scalingMu.Unlock()
go startUpscalingThreads(maxScaledThreads, scaleChan, mainThread.done)
go startDownScalingThreads(mainThread.done)
}
func drainAutoScaling() {
scalingMu.Lock()
logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down autoscaling", slog.Int("autoScaledThreads", len(autoScaledThreads)))
scalingMu.Unlock()
}
func addRegularThread() (*phpThread, error) {
thread := getInactivePHPThread()
if thread == nil {
return nil, ErrMaxThreadsReached
}
convertToRegularThread(thread)
thread.state.waitFor(stateReady, stateShuttingDown, stateReserved)
return thread, nil
}
func addWorkerThread(worker *worker) (*phpThread, error) {
thread := getInactivePHPThread()
if thread == nil {
return nil, ErrMaxThreadsReached
}
convertToWorkerThread(thread, worker)
thread.state.waitFor(stateReady, stateShuttingDown, stateReserved)
return thread, nil
}
// scaleWorkerThread adds a worker PHP thread automatically
func scaleWorkerThread(worker *worker) {
scalingMu.Lock()
defer scalingMu.Unlock()
if !mainThread.state.is(stateReady) {
return
}
// probe CPU usage before scaling
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
return
}
thread, err := addWorkerThread(worker)
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "could not increase max_threads, consider raising this limit", slog.String("worker", worker.name), slog.Any("error", err))
return
}
autoScaledThreads = append(autoScaledThreads, thread)
}
// scaleRegularThread adds a regular PHP thread automatically
func scaleRegularThread() {
scalingMu.Lock()
defer scalingMu.Unlock()
if !mainThread.state.is(stateReady) {
return
}
// probe CPU usage before scaling
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
return
}
thread, err := addRegularThread()
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "could not increase max_threads, consider raising this limit", slog.Any("error", err))
return
}
autoScaledThreads = append(autoScaledThreads, thread)
}
func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext, done chan struct{}) {
for {
scalingMu.Lock()
scaledThreadCount := len(autoScaledThreads)
scalingMu.Unlock()
if scaledThreadCount >= maxScaledThreads {
// we have reached max_threads, check again later
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
continue
}
}
select {
case fc := <-scale:
timeSinceStalled := time.Since(fc.startedAt)
// if the request has not been stalled long enough, wait and repeat
if timeSinceStalled < minStallTime {
select {
case <-done:
return
case <-time.After(minStallTime - timeSinceStalled):
continue
}
}
// if the request has been stalled long enough, scale
if worker, ok := workers[getWorkerKey(fc.workerName, fc.scriptFilename)]; ok {
scaleWorkerThread(worker)
} else {
scaleRegularThread()
}
case <-done:
return
}
}
}
func startDownScalingThreads(done chan struct{}) {
for {
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
deactivateThreads()
}
}
}
// deactivateThreads checks all threads and removes those that have been inactive for too long
func deactivateThreads() {
stoppedThreadCount := 0
scalingMu.Lock()
defer scalingMu.Unlock()
for i := len(autoScaledThreads) - 1; i >= 0; i-- {
thread := autoScaledThreads[i]
// the thread might have been stopped otherwise, remove it
if thread.state.is(stateReserved) {
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
continue
}
waitTime := thread.state.waitTime()
if stoppedThreadCount > maxTerminationCount || waitTime == 0 {
continue
}
// convert threads to inactive if they have been idle for too long
if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() {
logger.LogAttrs(context.Background(), slog.LevelDebug, "auto-converting thread to inactive", slog.Int("threadIndex", thread.threadIndex))
convertToInactiveThread(thread)
stoppedThreadCount++
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
continue
}
// TODO: Completely stopping threads is more memory efficient
// Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory)
// Reactivate this if there is a better solution or workaround
// if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() {
// logger.LogAttrs(nil, slog.LevelDebug, "auto-stopping thread", slog.Int("threadIndex", thread.threadIndex))
// thread.shutdown()
// stoppedThreadCount++
// autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
// continue
// }
}
}