mirror of
https://github.com/dunglas/frankenphp.git
synced 2025-12-24 13:38:11 +08:00
Compare commits
50 Commits
v1.11.0
...
feat/task-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd40e62cb8 | ||
|
|
acf423f9f0 | ||
|
|
02c27fc2a9 | ||
|
|
d54f736db7 | ||
|
|
694b6188c0 | ||
|
|
b275cd58f8 | ||
|
|
3f63a4d137 | ||
|
|
e801a49f3f | ||
|
|
12b6aaeac6 | ||
|
|
a5a9351020 | ||
|
|
03d886d32e | ||
|
|
8144a06ebb | ||
|
|
8a5d489c85 | ||
|
|
6b9c236d9a | ||
|
|
02a3b3f56d | ||
|
|
117b415e8b | ||
|
|
7565628516 | ||
|
|
05bf065a1b | ||
|
|
58d1761fe8 | ||
|
|
b23f3f833e | ||
|
|
268d294313 | ||
|
|
77fec2b4a7 | ||
|
|
df7e77d3a6 | ||
|
|
0dff2a2790 | ||
|
|
0c0a0cb19b | ||
|
|
99bb21f646 | ||
|
|
b8addd76e7 | ||
|
|
639817ebef | ||
|
|
9c36ed4624 | ||
|
|
83c7a88ec7 | ||
|
|
c16665ae78 | ||
|
|
0d43efff35 | ||
|
|
eb2b575cbc | ||
|
|
f5e6a045b9 | ||
|
|
7a2bb89c9b | ||
|
|
2297616552 | ||
|
|
7982b3af59 | ||
|
|
6c3e1d6af6 | ||
|
|
2387a9d74c | ||
|
|
7438edd676 | ||
|
|
9c4cf7e2d8 | ||
|
|
7f52e2d116 | ||
|
|
f43c8bb1bf | ||
|
|
6e79380ddc | ||
|
|
a102da8171 | ||
|
|
a6999209d3 | ||
|
|
65e11372c1 | ||
|
|
8ad2351abc | ||
|
|
abdb279bd3 | ||
|
|
d852f1cd4b |
18
caddy/app.go
18
caddy/app.go
@@ -33,6 +33,8 @@ type FrankenPHPApp struct {
|
||||
MaxThreads int `json:"max_threads,omitempty"`
|
||||
// Workers configures the worker scripts to start.
|
||||
Workers []workerConfig `json:"workers,omitempty"`
|
||||
// TaskWorkers configures the task worker scripts to start.
|
||||
TaskWorkers []workerConfig `json:"task_workers,omitempty"`
|
||||
// Overwrites the default php ini configuration
|
||||
PhpIni map[string]string `json:"php_ini,omitempty"`
|
||||
// The maximum amount of time a request may be stalled waiting for a thread
|
||||
@@ -127,6 +129,15 @@ func (f *FrankenPHPApp) Start() error {
|
||||
|
||||
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...))
|
||||
}
|
||||
for _, tw := range f.TaskWorkers {
|
||||
workerOpts := []frankenphp.WorkerOption{
|
||||
frankenphp.WithWorkerEnv(tw.Env),
|
||||
frankenphp.WithWorkerWatchMode(tw.Watch),
|
||||
frankenphp.AsTaskWorker(true, 0), // TODO: maxQueueLen configurable here?
|
||||
}
|
||||
|
||||
opts = append(opts, frankenphp.WithWorkers(tw.Name, repl.ReplaceKnown(tw.FileName, ""), tw.Num, workerOpts...))
|
||||
}
|
||||
|
||||
frankenphp.Shutdown()
|
||||
if err := frankenphp.Init(opts...); err != nil {
|
||||
@@ -234,6 +245,13 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
|
||||
}
|
||||
}
|
||||
|
||||
case "task_worker":
|
||||
twc, err := parseWorkerConfig(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.TaskWorkers = append(f.TaskWorkers, twc)
|
||||
|
||||
case "worker":
|
||||
wc, err := parseWorkerConfig(d)
|
||||
if err != nil {
|
||||
|
||||
@@ -1423,3 +1423,31 @@ func TestWorkerMatchDirectiveWithoutFileServer(t *testing.T) {
|
||||
// the request should completely fall through the php_server module
|
||||
tester.AssertGetResponse("http://localhost:"+testPort+"/static.txt", http.StatusNotFound, "Request falls through")
|
||||
}
|
||||
|
||||
func TestServerWithTaskWorker(t *testing.T) {
|
||||
tester := caddytest.NewTester(t)
|
||||
taskWorker, err := fastabs.FastAbs("../testdata/tasks/task-worker.php")
|
||||
require.NoError(t, err)
|
||||
tester.InitServer(`
|
||||
{
|
||||
skip_install_trust
|
||||
admin localhost:2999
|
||||
|
||||
frankenphp {
|
||||
num_threads 2
|
||||
task_worker `+taskWorker+` {
|
||||
num 1
|
||||
}
|
||||
}
|
||||
}
|
||||
`, "caddyfile")
|
||||
|
||||
debugState := getDebugState(t, tester)
|
||||
require.Len(t, debugState.ThreadDebugStates, 2, "there should be 3 threads")
|
||||
require.Equal(
|
||||
t,
|
||||
debugState.ThreadDebugStates[1].Name,
|
||||
"Task Worker PHP Thread - "+taskWorker,
|
||||
"the second spawned thread should be the task worker",
|
||||
)
|
||||
}
|
||||
|
||||
6
cgi.go
6
cgi.go
@@ -277,13 +277,13 @@ func splitPos(path string, splitPath []string) int {
|
||||
// See: https://github.com/php/php-src/blob/345e04b619c3bc11ea17ee02cdecad6ae8ce5891/main/SAPI.h#L72
|
||||
//
|
||||
//export go_update_request_info
|
||||
func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) C.bool {
|
||||
func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) {
|
||||
thread := phpThreads[threadIndex]
|
||||
fc := thread.getRequestContext()
|
||||
request := fc.request
|
||||
|
||||
if request == nil {
|
||||
return C.bool(fc.worker != nil)
|
||||
return
|
||||
}
|
||||
|
||||
authUser, authPassword, ok := request.BasicAuth()
|
||||
@@ -311,8 +311,6 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info)
|
||||
info.request_uri = thread.pinCString(request.URL.RequestURI())
|
||||
|
||||
info.proto_num = C.int(request.ProtoMajor*1000 + request.ProtoMinor)
|
||||
|
||||
return C.bool(fc.worker != nil)
|
||||
}
|
||||
|
||||
// SanitizedPathJoin performs filepath.Join(root, reqPath) that
|
||||
|
||||
77
frankenphp.c
77
frankenphp.c
@@ -73,6 +73,7 @@ frankenphp_config frankenphp_get_config() {
|
||||
bool should_filter_var = 0;
|
||||
__thread uintptr_t thread_index;
|
||||
__thread bool is_worker_thread = false;
|
||||
__thread bool is_task_worker_thread = false;
|
||||
__thread zval *os_environment = NULL;
|
||||
|
||||
static void frankenphp_update_request_context() {
|
||||
@@ -82,7 +83,12 @@ static void frankenphp_update_request_context() {
|
||||
/* status It is not reset by zend engine, set it to 200. */
|
||||
SG(sapi_headers).http_response_code = 200;
|
||||
|
||||
is_worker_thread = go_update_request_info(thread_index, &SG(request_info));
|
||||
go_update_request_info(thread_index, &SG(request_info));
|
||||
}
|
||||
|
||||
void frankenphp_update_thread_context(bool is_worker, bool is_task_worker) {
|
||||
is_worker_thread = is_worker;
|
||||
is_task_worker_thread = is_task_worker;
|
||||
}
|
||||
|
||||
static void frankenphp_free_request_context() {
|
||||
@@ -411,6 +417,49 @@ PHP_FUNCTION(frankenphp_response_headers) /* {{{ */
|
||||
}
|
||||
/* }}} */
|
||||
|
||||
/* Handle a message in task worker mode */
|
||||
static bool frankenphp_handle_message(zend_fcall_info fci,
|
||||
zend_fcall_info_cache fcc) {
|
||||
zval *arg = go_frankenphp_worker_handle_task(thread_index);
|
||||
if (arg == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Call the PHP func passed to frankenphp_handle_request() */
|
||||
zval retval = {0};
|
||||
fci.size = sizeof fci;
|
||||
fci.retval = &retval;
|
||||
fci.params = arg;
|
||||
fci.param_count = 1;
|
||||
zend_bool status = zend_call_function(&fci, &fcc) == SUCCESS;
|
||||
|
||||
if (!status || Z_TYPE(retval) == IS_UNDEF) {
|
||||
go_frankenphp_finish_task(thread_index, NULL);
|
||||
zval_ptr_dtor(arg);
|
||||
} else {
|
||||
go_frankenphp_finish_task(thread_index, &retval);
|
||||
}
|
||||
|
||||
zval_ptr_dtor(&retval);
|
||||
|
||||
/*
|
||||
* If an exception occurred, print the message to the client before
|
||||
* exiting
|
||||
*/
|
||||
if (EG(exception) && !zend_is_unwind_exit(EG(exception)) &&
|
||||
!zend_is_graceful_exit(EG(exception))) {
|
||||
zend_exception_error(EG(exception), E_ERROR);
|
||||
zend_bailout();
|
||||
}
|
||||
|
||||
zend_try { php_output_end_all(); }
|
||||
zend_end_try();
|
||||
|
||||
zval_ptr_dtor(arg);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
PHP_FUNCTION(frankenphp_handle_request) {
|
||||
zend_fcall_info fci;
|
||||
zend_fcall_info_cache fcc;
|
||||
@@ -420,6 +469,13 @@ PHP_FUNCTION(frankenphp_handle_request) {
|
||||
ZEND_PARSE_PARAMETERS_END();
|
||||
|
||||
if (!is_worker_thread) {
|
||||
|
||||
/* thread is a task worker
|
||||
* handle the message and do not reset globals */
|
||||
if (is_task_worker_thread) {
|
||||
bool keep_running = frankenphp_handle_message(fci, fcc);
|
||||
RETURN_BOOL(keep_running);
|
||||
}
|
||||
/* not a worker, throw an error */
|
||||
zend_throw_exception(
|
||||
spl_ce_RuntimeException,
|
||||
@@ -484,6 +540,25 @@ PHP_FUNCTION(frankenphp_handle_request) {
|
||||
RETURN_TRUE;
|
||||
}
|
||||
|
||||
PHP_FUNCTION(frankenphp_send_request) {
|
||||
zval *zv;
|
||||
char *worker_name = NULL;
|
||||
size_t worker_name_len = 0;
|
||||
|
||||
ZEND_PARSE_PARAMETERS_START(1, 2);
|
||||
Z_PARAM_ZVAL(zv);
|
||||
Z_PARAM_OPTIONAL
|
||||
Z_PARAM_STRING(worker_name, worker_name_len);
|
||||
ZEND_PARSE_PARAMETERS_END();
|
||||
|
||||
char *error = go_frankenphp_send_request(thread_index, zv, worker_name,
|
||||
worker_name_len);
|
||||
if (error) {
|
||||
zend_throw_exception(spl_ce_RuntimeException, error, 0);
|
||||
RETURN_THROWS();
|
||||
}
|
||||
}
|
||||
|
||||
PHP_FUNCTION(headers_send) {
|
||||
zend_long response_code = 200;
|
||||
|
||||
|
||||
@@ -37,6 +37,8 @@ import (
|
||||
"unsafe"
|
||||
// debug on Linux
|
||||
//_ "github.com/ianlancetaylor/cgosymbolizer"
|
||||
|
||||
"github.com/dunglas/frankenphp/internal/watcher"
|
||||
)
|
||||
|
||||
type contextKeyStruct struct{}
|
||||
@@ -52,7 +54,8 @@ var (
|
||||
ErrScriptExecution = errors.New("error during PHP script execution")
|
||||
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
|
||||
|
||||
isRunning bool
|
||||
isRunning bool
|
||||
watcherIsEnabled bool
|
||||
|
||||
loggerMu sync.RWMutex
|
||||
logger *slog.Logger
|
||||
@@ -151,6 +154,10 @@ func calculateMaxThreads(opt *opt) (int, int, int, error) {
|
||||
numWorkers += opt.workers[i].num
|
||||
}
|
||||
|
||||
for _, tw := range opt.taskWorkers {
|
||||
numWorkers += tw.num
|
||||
}
|
||||
|
||||
numThreadsIsSet := opt.numThreads > 0
|
||||
maxThreadsIsSet := opt.maxThreads != 0
|
||||
maxThreadsIsAuto := opt.maxThreads < 0 // maxthreads < 0 signifies auto mode (see phpmaintread.go)
|
||||
@@ -279,10 +286,23 @@ func Init(options ...Option) error {
|
||||
convertToRegularThread(getInactivePHPThread())
|
||||
}
|
||||
|
||||
directoriesToWatch := getDirectoriesToWatch(append(opt.workers, opt.taskWorkers...))
|
||||
watcherIsEnabled = len(directoriesToWatch) > 0 // watcherIsEnabled needs to be set before initWorkers()
|
||||
|
||||
if err := initWorkers(opt.workers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := initTaskWorkers(opt.taskWorkers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if watcherIsEnabled {
|
||||
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
initAutoScaling(mainThread)
|
||||
|
||||
ctx := context.Background()
|
||||
@@ -300,8 +320,11 @@ func Shutdown() {
|
||||
return
|
||||
}
|
||||
|
||||
drainWatcher()
|
||||
if watcherIsEnabled {
|
||||
watcher.DrainWatcher()
|
||||
}
|
||||
drainAutoScaling()
|
||||
drainTaskWorkers()
|
||||
drainPHPThreads()
|
||||
|
||||
metrics.Shutdown()
|
||||
@@ -629,3 +652,12 @@ func timeoutChan(timeout time.Duration) <-chan time.Time {
|
||||
|
||||
return time.After(timeout)
|
||||
}
|
||||
|
||||
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
|
||||
directoriesToWatch := []string{}
|
||||
for _, w := range workerOpts {
|
||||
directoriesToWatch = append(directoriesToWatch, w.watch...)
|
||||
}
|
||||
|
||||
return directoriesToWatch
|
||||
}
|
||||
|
||||
@@ -86,4 +86,6 @@ void frankenphp_register_bulk(
|
||||
|
||||
void register_extensions(zend_module_entry *m, int len);
|
||||
|
||||
void frankenphp_update_thread_context(bool is_worker, bool is_task_worker);
|
||||
|
||||
#endif
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
|
||||
function frankenphp_handle_request(callable $callback): bool {}
|
||||
|
||||
function frankenphp_send_request(mixed $task, string $workerName = ''): void {}
|
||||
|
||||
function headers_send(int $status = 200): int {}
|
||||
|
||||
function frankenphp_finish_request(): bool {}
|
||||
|
||||
@@ -6,6 +6,12 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1,
|
||||
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_send_request, 0, 1,
|
||||
IS_VOID, 0)
|
||||
ZEND_ARG_TYPE_INFO(0, task, IS_MIXED, 0)
|
||||
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, worker_name, IS_STRING, 0, "\"\"")
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0)
|
||||
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200")
|
||||
ZEND_END_ARG_INFO()
|
||||
@@ -31,6 +37,7 @@ ZEND_END_ARG_INFO()
|
||||
#define arginfo_apache_response_headers arginfo_frankenphp_response_headers
|
||||
|
||||
ZEND_FUNCTION(frankenphp_handle_request);
|
||||
ZEND_FUNCTION(frankenphp_send_request);
|
||||
ZEND_FUNCTION(headers_send);
|
||||
ZEND_FUNCTION(frankenphp_finish_request);
|
||||
ZEND_FUNCTION(frankenphp_request_headers);
|
||||
@@ -39,6 +46,7 @@ ZEND_FUNCTION(frankenphp_response_headers);
|
||||
// clang-format off
|
||||
static const zend_function_entry ext_functions[] = {
|
||||
ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request)
|
||||
ZEND_FE(frankenphp_send_request, arginfo_frankenphp_send_request)
|
||||
ZEND_FE(headers_send, arginfo_headers_send)
|
||||
ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request)
|
||||
ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request)
|
||||
|
||||
20
options.go
20
options.go
@@ -22,6 +22,7 @@ type opt struct {
|
||||
numThreads int
|
||||
maxThreads int
|
||||
workers []workerOpt
|
||||
taskWorkers []workerOpt
|
||||
logger *slog.Logger
|
||||
metrics Metrics
|
||||
phpIni map[string]string
|
||||
@@ -35,6 +36,8 @@ type workerOpt struct {
|
||||
env PreparedEnv
|
||||
watch []string
|
||||
maxConsecutiveFailures int
|
||||
isTaskWorker bool
|
||||
maxQueueLen int
|
||||
}
|
||||
|
||||
// WithNumThreads configures the number of PHP threads to start.
|
||||
@@ -80,7 +83,11 @@ func WithWorkers(name string, fileName string, num int, options ...WorkerOption)
|
||||
}
|
||||
}
|
||||
|
||||
o.workers = append(o.workers, worker)
|
||||
if worker.isTaskWorker {
|
||||
o.taskWorkers = append(o.taskWorkers, worker)
|
||||
} else {
|
||||
o.workers = append(o.workers, worker)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -141,3 +148,14 @@ func WithMaxWaitTime(maxWaitTime time.Duration) Option {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: AsTaskWorker configures the worker as a task worker.
|
||||
// no http requests will be handled.
|
||||
// no globals resetting will be performed between tasks.
|
||||
func AsTaskWorker(isTaskWorker bool, maxQueueLen int) WorkerOption {
|
||||
return func(w *workerOpt) error {
|
||||
w.isTaskWorker = isTaskWorker
|
||||
w.maxQueueLen = maxQueueLen
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,6 +132,10 @@ func (thread *phpThread) pinCString(s string) *C.char {
|
||||
return thread.pinString(s + "\x00")
|
||||
}
|
||||
|
||||
func (thread *phpThread) updateContext(isWorker bool, isTaskWorker bool) {
|
||||
C.frankenphp_update_thread_context(C.bool(isWorker), C.bool(isTaskWorker))
|
||||
}
|
||||
|
||||
//export go_frankenphp_before_script_execution
|
||||
func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char {
|
||||
thread := phpThreads[threadIndex]
|
||||
|
||||
16
testdata/tasks/task-dispatcher-array.php
vendored
Normal file
16
testdata/tasks/task-dispatcher-array.php
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__ . '/../_executor.php';
|
||||
|
||||
return function () {
|
||||
$taskCount = $_GET['count'] ?? 0;
|
||||
$workerName = $_GET['worker'] ?? '';
|
||||
for ($i = 0; $i < $taskCount; $i++) {
|
||||
frankenphp_send_request([
|
||||
'task' => "array task$i",
|
||||
'worker' => $workerName,
|
||||
'index' => $i,
|
||||
]);
|
||||
}
|
||||
echo "dispatched $taskCount tasks\n";
|
||||
};
|
||||
12
testdata/tasks/task-dispatcher-string.php
vendored
Normal file
12
testdata/tasks/task-dispatcher-string.php
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
require_once __DIR__.'/../_executor.php';
|
||||
|
||||
return function () {
|
||||
$taskCount = $_GET['count'] ?? 0;
|
||||
$workerName = $_GET['worker'] ?? '';
|
||||
for ($i = 0; $i < $taskCount; $i++) {
|
||||
frankenphp_send_request("task$i", $workerName);
|
||||
}
|
||||
echo "dispatched $taskCount tasks\n";
|
||||
};
|
||||
14
testdata/tasks/task-worker.php
vendored
Normal file
14
testdata/tasks/task-worker.php
vendored
Normal file
@@ -0,0 +1,14 @@
|
||||
<?php
|
||||
|
||||
$handleFunc = function ($task) {
|
||||
var_dump($task);
|
||||
echo $_SERVER['CUSTOM_VAR'] ?? 'no custom var';
|
||||
return "task completed: ".$task;
|
||||
};
|
||||
|
||||
$maxTasksBeforeRestarting = 1000;
|
||||
$currentTask = 0;
|
||||
|
||||
while(frankenphp_handle_request($handleFunc) && $currentTask++ < $maxTasksBeforeRestarting) {
|
||||
// Keep handling tasks until there are no more tasks or the max limit is reached
|
||||
}
|
||||
@@ -35,6 +35,7 @@ func (handler *regularThread) beforeScriptExecution() string {
|
||||
return handler.thread.transitionToNewHandler()
|
||||
case stateTransitionComplete:
|
||||
handler.state.set(stateReady)
|
||||
handler.thread.updateContext(false, false)
|
||||
return handler.waitForRequest()
|
||||
case stateReady:
|
||||
return handler.waitForRequest()
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// representation of a thread that handles tasks directly assigned by go
|
||||
// implements the threadHandler interface
|
||||
type taskThread struct {
|
||||
thread *phpThread
|
||||
execChan chan *task
|
||||
}
|
||||
|
||||
// task callbacks will be executed directly on the PHP thread
|
||||
// therefore having full access to the PHP runtime
|
||||
type task struct {
|
||||
callback func()
|
||||
done sync.Mutex
|
||||
}
|
||||
|
||||
func newTask(cb func()) *task {
|
||||
t := &task{callback: cb}
|
||||
t.done.Lock()
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *task) waitForCompletion() {
|
||||
t.done.Lock()
|
||||
}
|
||||
|
||||
func convertToTaskThread(thread *phpThread) *taskThread {
|
||||
handler := &taskThread{
|
||||
thread: thread,
|
||||
execChan: make(chan *task),
|
||||
}
|
||||
thread.setHandler(handler)
|
||||
return handler
|
||||
}
|
||||
|
||||
func (handler *taskThread) beforeScriptExecution() string {
|
||||
thread := handler.thread
|
||||
|
||||
switch thread.state.get() {
|
||||
case stateTransitionRequested:
|
||||
return thread.transitionToNewHandler()
|
||||
case stateBooting, stateTransitionComplete:
|
||||
thread.state.set(stateReady)
|
||||
handler.waitForTasks()
|
||||
|
||||
return handler.beforeScriptExecution()
|
||||
case stateReady:
|
||||
handler.waitForTasks()
|
||||
|
||||
return handler.beforeScriptExecution()
|
||||
case stateShuttingDown:
|
||||
// signal to stop
|
||||
return ""
|
||||
}
|
||||
panic("unexpected state: " + thread.state.name())
|
||||
}
|
||||
|
||||
func (handler *taskThread) afterScriptExecution(int) {
|
||||
panic("task threads should not execute scripts")
|
||||
}
|
||||
|
||||
func (handler *taskThread) getRequestContext() *frankenPHPContext {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (handler *taskThread) name() string {
|
||||
return "Task PHP Thread"
|
||||
}
|
||||
|
||||
func (handler *taskThread) waitForTasks() {
|
||||
for {
|
||||
select {
|
||||
case task := <-handler.execChan:
|
||||
task.callback()
|
||||
task.done.Unlock() // unlock the task to signal completion
|
||||
case <-handler.thread.drainChan:
|
||||
// thread is shutting down, do not execute the function
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *taskThread) execute(t *task) {
|
||||
handler.execChan <- t
|
||||
}
|
||||
280
threadtaskworker.go
Normal file
280
threadtaskworker.go
Normal file
@@ -0,0 +1,280 @@
|
||||
package frankenphp
|
||||
|
||||
// #include "frankenphp.h"
|
||||
// #include <php_variables.h>
|
||||
import "C"
|
||||
import (
|
||||
"errors"
|
||||
"github.com/dunglas/frankenphp/internal/fastabs"
|
||||
"sync"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type taskWorker struct {
|
||||
threads []*phpThread
|
||||
threadMutex sync.RWMutex
|
||||
fileName string
|
||||
taskChan chan *pendingTask
|
||||
name string
|
||||
num int
|
||||
env PreparedEnv
|
||||
}
|
||||
|
||||
// representation of a thread that handles tasks directly assigned by go or via frankenphp_send_request()
|
||||
// can also just execute a script in a loop
|
||||
// implements the threadHandler interface
|
||||
type taskWorkerThread struct {
|
||||
thread *phpThread
|
||||
taskWorker *taskWorker
|
||||
dummyContext *frankenPHPContext
|
||||
currentTask *pendingTask
|
||||
}
|
||||
|
||||
var taskWorkers []*taskWorker
|
||||
|
||||
// EXPERIMENTAL: a task dispatched to a task worker
|
||||
type pendingTask struct {
|
||||
message any // the argument passed to frankenphp_send_request() or the return value of frankenphp_handle_request()
|
||||
done sync.RWMutex
|
||||
callback func() // optional callback for direct execution (tests)
|
||||
}
|
||||
|
||||
func initTaskWorkers(opts []workerOpt) error {
|
||||
taskWorkers = make([]*taskWorker, 0, len(opts))
|
||||
ready := sync.WaitGroup{}
|
||||
for _, opt := range opts {
|
||||
fileName, err := fastabs.FastAbs(opt.fileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if opt.maxQueueLen <= 0 {
|
||||
opt.maxQueueLen = 10000 // default queue len, TODO: unlimited?
|
||||
}
|
||||
|
||||
tw := &taskWorker{
|
||||
threads: make([]*phpThread, 0, opt.num),
|
||||
fileName: fileName,
|
||||
taskChan: make(chan *pendingTask, opt.maxQueueLen),
|
||||
name: opt.name,
|
||||
num: opt.num,
|
||||
env: opt.env,
|
||||
}
|
||||
taskWorkers = append(taskWorkers, tw)
|
||||
|
||||
// start the actual PHP threads
|
||||
ready.Add(tw.num)
|
||||
for i := 0; i < tw.num; i++ {
|
||||
thread := getInactivePHPThread()
|
||||
convertToTaskWorkerThread(thread, tw)
|
||||
go func(thread *phpThread) {
|
||||
thread.state.waitFor(stateReady)
|
||||
ready.Done()
|
||||
}(thread)
|
||||
}
|
||||
}
|
||||
ready.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func drainTaskWorkers() {
|
||||
for _, tw := range taskWorkers {
|
||||
tw.drainQueue()
|
||||
}
|
||||
}
|
||||
|
||||
func convertToTaskWorkerThread(thread *phpThread, tw *taskWorker) *taskWorkerThread {
|
||||
handler := &taskWorkerThread{
|
||||
thread: thread,
|
||||
taskWorker: tw,
|
||||
}
|
||||
thread.setHandler(handler)
|
||||
|
||||
return handler
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) beforeScriptExecution() string {
|
||||
thread := handler.thread
|
||||
|
||||
switch thread.state.get() {
|
||||
case stateTransitionRequested:
|
||||
handler.taskWorker.detach(thread)
|
||||
|
||||
return thread.transitionToNewHandler()
|
||||
case stateBooting, stateTransitionComplete:
|
||||
tw := handler.taskWorker
|
||||
tw.threadMutex.Lock()
|
||||
tw.threads = append(tw.threads, thread)
|
||||
tw.threadMutex.Unlock()
|
||||
thread.state.set(stateReady)
|
||||
thread.updateContext(false, true)
|
||||
|
||||
return handler.setupWorkerScript()
|
||||
case stateReady:
|
||||
|
||||
return handler.setupWorkerScript()
|
||||
case stateRestarting:
|
||||
thread.state.set(stateYielding)
|
||||
thread.state.waitFor(stateReady, stateShuttingDown)
|
||||
|
||||
return handler.beforeScriptExecution()
|
||||
case stateShuttingDown:
|
||||
handler.taskWorker.detach(thread)
|
||||
// signal to stop
|
||||
return ""
|
||||
}
|
||||
panic("unexpected state: " + thread.state.name())
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) setupWorkerScript() string {
|
||||
fc, err := newDummyContext(handler.taskWorker.fileName, WithRequestPreparedEnv(handler.taskWorker.env))
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
handler.dummyContext = fc
|
||||
clearSandboxedEnv(handler.thread)
|
||||
|
||||
return handler.taskWorker.fileName
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) afterScriptExecution(int) {
|
||||
// restart the script
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) getRequestContext() *frankenPHPContext {
|
||||
return handler.dummyContext
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) name() string {
|
||||
return "Task Worker PHP Thread - " + handler.taskWorker.fileName
|
||||
}
|
||||
|
||||
func (tw *taskWorker) detach(thread *phpThread) {
|
||||
tw.threadMutex.Lock()
|
||||
defer tw.threadMutex.Unlock()
|
||||
for i, t := range tw.threads {
|
||||
if t == thread {
|
||||
tw.threads = append(tw.threads[:i], tw.threads[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure all tasks are done by re-queuing them until the channel is empty
|
||||
func (tw *taskWorker) drainQueue() {
|
||||
for {
|
||||
select {
|
||||
case pt := <-tw.taskChan:
|
||||
tw.taskChan <- pt
|
||||
pt.done.RLock() // wait for completion
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tw *taskWorker) dispatch(t *pendingTask) error {
|
||||
t.done.Lock()
|
||||
select {
|
||||
case tw.taskChan <- t:
|
||||
return nil
|
||||
default:
|
||||
return errors.New("Task worker queue is full, cannot dispatch task: " + tw.name)
|
||||
}
|
||||
}
|
||||
|
||||
func getTaskWorkerByName(name string) *taskWorker {
|
||||
for _, w := range taskWorkers {
|
||||
if w.name == name {
|
||||
return w
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//export go_frankenphp_worker_handle_task
|
||||
func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) *C.zval {
|
||||
thread := phpThreads[threadIndex]
|
||||
handler, _ := thread.handler.(*taskWorkerThread)
|
||||
thread.Unpin()
|
||||
thread.state.markAsWaiting(true)
|
||||
|
||||
select {
|
||||
case task := <-handler.taskWorker.taskChan:
|
||||
handler.currentTask = task
|
||||
thread.state.markAsWaiting(false)
|
||||
|
||||
// if the task has a callback, execute it (see types_test.go)
|
||||
if task.callback != nil {
|
||||
task.callback()
|
||||
go_frankenphp_finish_task(threadIndex, nil)
|
||||
|
||||
return go_frankenphp_worker_handle_task(threadIndex)
|
||||
}
|
||||
|
||||
zval := phpValue(task.message)
|
||||
task.message = nil // free memory
|
||||
thread.Pin(unsafe.Pointer(zval)) // TODO: refactor types.go so no pinning is required
|
||||
|
||||
return zval
|
||||
case <-handler.thread.drainChan:
|
||||
thread.state.markAsWaiting(false)
|
||||
// send an empty task to drain the thread
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
//export go_frankenphp_finish_task
|
||||
func go_frankenphp_finish_task(threadIndex C.uintptr_t, zv *C.zval) {
|
||||
thread := phpThreads[threadIndex]
|
||||
handler, ok := thread.handler.(*taskWorkerThread)
|
||||
if !ok {
|
||||
panic("thread is not a task thread: " + thread.handler.name())
|
||||
}
|
||||
|
||||
if zv != nil {
|
||||
result, err := goValue[any](zv)
|
||||
if err != nil {
|
||||
panic("failed to convert go_frankenphp_finish_task() return value: " + err.Error())
|
||||
}
|
||||
handler.currentTask.message = result
|
||||
}
|
||||
handler.currentTask.done.Unlock()
|
||||
handler.currentTask = nil
|
||||
}
|
||||
|
||||
//export go_frankenphp_send_request
|
||||
func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.char, nameLen C.size_t) *C.char {
|
||||
if zv == nil {
|
||||
return phpThreads[threadIndex].pinCString("Task argument cannot be null")
|
||||
}
|
||||
|
||||
var tw *taskWorker
|
||||
if nameLen != 0 {
|
||||
tw = getTaskWorkerByName(C.GoStringN(name, C.int(nameLen)))
|
||||
} else if len(taskWorkers) != 0 {
|
||||
tw = taskWorkers[0]
|
||||
}
|
||||
|
||||
if tw == nil {
|
||||
return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen)))
|
||||
}
|
||||
|
||||
// convert the argument of frankenphp_send_request() to a Go value
|
||||
goArg, err := goValue[any](zv)
|
||||
if err != nil {
|
||||
return phpThreads[threadIndex].pinCString("Failed to convert frankenphp_send_request() argument: " + err.Error())
|
||||
}
|
||||
|
||||
err = tw.dispatch(&pendingTask{message: goArg})
|
||||
|
||||
if err != nil {
|
||||
return phpThreads[threadIndex].pinCString(err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
89
threadtaskworker_test.go
Normal file
89
threadtaskworker_test.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log/slog"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func assertGetRequest(t *testing.T, url string, expectedBodyContains string, opts ...RequestOption) {
|
||||
t.Helper()
|
||||
r := httptest.NewRequest("GET", url, nil)
|
||||
w := httptest.NewRecorder()
|
||||
req, err := NewRequestWithContext(r, opts...)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, ServeHTTP(w, req))
|
||||
assert.Contains(t, w.Body.String(), expectedBodyContains)
|
||||
}
|
||||
|
||||
func TestDispatchToTaskWorkerFromWorker(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
|
||||
logger := slog.New(handler)
|
||||
|
||||
assert.NoError(t, Init(
|
||||
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
|
||||
WithWorkers("worker1", "./testdata/tasks/task-dispatcher-string.php", 1),
|
||||
WithNumThreads(3), // regular thread, task worker thread, dispatcher threads
|
||||
WithLogger(logger),
|
||||
))
|
||||
|
||||
assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-string.php?count=4", "dispatched 4 tasks")
|
||||
|
||||
// wait and shutdown to ensure all logs are flushed
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
Shutdown()
|
||||
|
||||
// task output appears in logs at info level
|
||||
logOutput := buf.String()
|
||||
assert.Contains(t, logOutput, "task0")
|
||||
assert.Contains(t, logOutput, "task1")
|
||||
assert.Contains(t, logOutput, "task2")
|
||||
assert.Contains(t, logOutput, "task3")
|
||||
}
|
||||
|
||||
func TestDispatchArrayToTaskWorker(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
|
||||
logger := slog.New(handler)
|
||||
|
||||
assert.NoError(t, Init(
|
||||
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
|
||||
WithWorkers("worker2", "./testdata/tasks/task-dispatcher-array.php", 1),
|
||||
WithNumThreads(3), // regular thread, task worker thread, dispatcher thread
|
||||
WithLogger(logger),
|
||||
))
|
||||
|
||||
assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-array.php?count=1", "dispatched 1 tasks")
|
||||
|
||||
// wait and shutdown to ensure all logs are flushed
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
Shutdown()
|
||||
|
||||
// task output appears in logs at info level
|
||||
logOutput := buf.String()
|
||||
assert.Contains(t, logOutput, "array task0")
|
||||
}
|
||||
|
||||
func TestDispatchToMultipleWorkers(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
|
||||
logger := slog.New(handler)
|
||||
|
||||
assert.NoError(t, Init(
|
||||
WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
|
||||
WithWorkers("worker2", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
|
||||
WithNumThreads(4),
|
||||
WithLogger(logger),
|
||||
))
|
||||
defer Shutdown()
|
||||
|
||||
script := "http://example.com/testdata/tasks/task-dispatcher-string.php"
|
||||
assertGetRequest(t, script+"?count=1&worker=worker1", "dispatched 1 tasks")
|
||||
assertGetRequest(t, script+"?count=1&worker=worker2", "dispatched 1 tasks")
|
||||
assertGetRequest(t, script+"?count=1&worker=worker3", "No worker found to handle this task") // fail
|
||||
}
|
||||
@@ -62,6 +62,7 @@ func (handler *workerThread) beforeScriptExecution() string {
|
||||
if handler.externalWorker != nil {
|
||||
handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex)
|
||||
}
|
||||
handler.thread.updateContext(true, false)
|
||||
setupWorkerScript(handler, handler.worker)
|
||||
return handler.worker.fileName
|
||||
case stateShuttingDown:
|
||||
|
||||
2
types.c
2
types.c
@@ -31,6 +31,8 @@ void __zval_double__(zval *zv, double val) { ZVAL_DOUBLE(zv, val); }
|
||||
|
||||
void __zval_string__(zval *zv, zend_string *str) { ZVAL_STR(zv, str); }
|
||||
|
||||
void __zval_empty_string__(zval *zv) { ZVAL_EMPTY_STRING(zv); }
|
||||
|
||||
void __zval_arr__(zval *zv, zend_array *arr) { ZVAL_ARR(zv, arr); }
|
||||
|
||||
zend_array *__zend_new_array__(uint32_t size) { return zend_new_array(size); }
|
||||
|
||||
15
types.go
15
types.go
@@ -263,7 +263,6 @@ func goValue[T any](zval *C.zval) (res T, err error) {
|
||||
resZero T
|
||||
)
|
||||
t := C.zval_get_type(zval)
|
||||
|
||||
switch t {
|
||||
case C.IS_NULL:
|
||||
resAny = any(nil)
|
||||
@@ -382,6 +381,10 @@ func phpValue(value any) *C.zval {
|
||||
case float64:
|
||||
C.__zval_double__(&zval, C.double(v))
|
||||
case string:
|
||||
if v == "" {
|
||||
C.__zval_empty_string__(&zval)
|
||||
break
|
||||
}
|
||||
str := (*C.zend_string)(PHPString(v, false))
|
||||
C.__zval_string__(&zval, str)
|
||||
case map[string]any:
|
||||
@@ -435,3 +438,13 @@ func extractZvalValue(zval *C.zval, expectedType C.uint8_t) (unsafe.Pointer, err
|
||||
|
||||
return nil, fmt.Errorf("unsupported zval type %d", expectedType)
|
||||
}
|
||||
|
||||
// used for cleanup in tests
|
||||
func zvalPtrDtor(p unsafe.Pointer) {
|
||||
C.zval_ptr_dtor((*C.zval)(p))
|
||||
}
|
||||
|
||||
// used for cleanup in tests
|
||||
func zendStringRelease(p unsafe.Pointer) {
|
||||
C.zend_string_release((*C.zend_string)(p))
|
||||
}
|
||||
|
||||
1
types.h
1
types.h
@@ -19,6 +19,7 @@ void __zval_bool__(zval *zv, bool val);
|
||||
void __zval_long__(zval *zv, zend_long val);
|
||||
void __zval_double__(zval *zv, double val);
|
||||
void __zval_string__(zval *zv, zend_string *str);
|
||||
void __zval_empty_string__(zval *zv);
|
||||
void __zval_arr__(zval *zv, zend_array *arr);
|
||||
zend_array *__zend_new_array__(uint32_t size);
|
||||
|
||||
|
||||
@@ -1,37 +1,50 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/exp/zapslog"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
// execute the function on a PHP thread directly
|
||||
// this is necessary if tests make use of PHP's internal allocation
|
||||
func testOnDummyPHPThread(t *testing.T, test func()) {
|
||||
func testOnDummyPHPThread(t *testing.T, cb func()) {
|
||||
t.Helper()
|
||||
logger = slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
_, err := initPHPThreads(1, 1, nil) // boot 1 thread
|
||||
assert.NoError(t, err)
|
||||
handler := convertToTaskThread(phpThreads[0])
|
||||
logger = slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))
|
||||
assert.NoError(t, Init(
|
||||
WithWorkers("tw", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
|
||||
WithNumThreads(2),
|
||||
WithLogger(logger),
|
||||
))
|
||||
defer Shutdown()
|
||||
|
||||
task := newTask(test)
|
||||
handler.execute(task)
|
||||
task.waitForCompletion()
|
||||
assert.NoError(t, executeOnPHPThread(cb, "tw"))
|
||||
}
|
||||
|
||||
drainPHPThreads()
|
||||
// executeOnPHPThread executes the callback func() directly on a task worker thread
|
||||
// useful for testing purposes when dealing with PHP allocations
|
||||
func executeOnPHPThread(callback func(), taskWorkerName string) error {
|
||||
tw := getTaskWorkerByName(taskWorkerName)
|
||||
if tw == nil {
|
||||
return errors.New("no task worker found with name " + taskWorkerName)
|
||||
}
|
||||
|
||||
return tw.dispatch(&pendingTask{callback: callback})
|
||||
}
|
||||
|
||||
func TestGoString(t *testing.T) {
|
||||
testOnDummyPHPThread(t, func() {
|
||||
originalString := "Hello, World!"
|
||||
|
||||
convertedString := GoString(PHPString(originalString, false))
|
||||
phpString := PHPString(originalString, false)
|
||||
defer zendStringRelease(phpString)
|
||||
|
||||
assert.Equal(t, originalString, convertedString, "string -> zend_string -> string should yield an equal string")
|
||||
assert.Equal(t, originalString, GoString(phpString), "string -> zend_string -> string should yield an equal string")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -42,7 +55,9 @@ func TestPHPMap(t *testing.T) {
|
||||
"foo2": "bar2",
|
||||
}
|
||||
|
||||
convertedMap, err := GoMap[string](PHPMap(originalMap))
|
||||
phpArray := PHPMap(originalMap)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
convertedMap, err := GoMap[string](phpArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, originalMap, convertedMap, "associative array should be equal after conversion")
|
||||
@@ -59,7 +74,9 @@ func TestOrderedPHPAssociativeArray(t *testing.T) {
|
||||
Order: []string{"foo2", "foo1"},
|
||||
}
|
||||
|
||||
convertedArray, err := GoAssociativeArray[string](PHPAssociativeArray(originalArray))
|
||||
phpArray := PHPAssociativeArray(originalArray)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
convertedArray, err := GoAssociativeArray[string](phpArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, originalArray, convertedArray, "associative array should be equal after conversion")
|
||||
@@ -70,7 +87,9 @@ func TestPHPPackedArray(t *testing.T) {
|
||||
testOnDummyPHPThread(t, func() {
|
||||
originalSlice := []string{"bar1", "bar2"}
|
||||
|
||||
convertedSlice, err := GoPackedArray[string](PHPPackedArray(originalSlice))
|
||||
phpArray := PHPPackedArray(originalSlice)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
convertedSlice, err := GoPackedArray[string](phpArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, originalSlice, convertedSlice, "slice should be equal after conversion")
|
||||
@@ -85,7 +104,9 @@ func TestPHPPackedArrayToGoMap(t *testing.T) {
|
||||
"1": "bar2",
|
||||
}
|
||||
|
||||
convertedMap, err := GoMap[string](PHPPackedArray(originalSlice))
|
||||
phpArray := PHPPackedArray(originalSlice)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
convertedMap, err := GoMap[string](phpArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, expectedMap, convertedMap, "convert a packed to an associative array")
|
||||
@@ -103,7 +124,9 @@ func TestPHPAssociativeArrayToPacked(t *testing.T) {
|
||||
}
|
||||
expectedSlice := []string{"bar1", "bar2"}
|
||||
|
||||
convertedSlice, err := GoPackedArray[string](PHPAssociativeArray(originalArray))
|
||||
phpArray := PHPAssociativeArray(originalArray)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
convertedSlice, err := GoPackedArray[string](phpArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, expectedSlice, convertedSlice, "convert an associative array to a slice")
|
||||
@@ -126,7 +149,9 @@ func TestNestedMixedArray(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
convertedArray, err := GoMap[any](PHPMap(originalArray))
|
||||
phpArray := PHPMap(originalArray)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
convertedArray, err := GoMap[any](phpArray)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, originalArray, convertedArray, "nested mixed array should be equal after conversion")
|
||||
|
||||
75
worker.go
75
worker.go
@@ -9,7 +9,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/dunglas/frankenphp/internal/fastabs"
|
||||
"github.com/dunglas/frankenphp/internal/watcher"
|
||||
)
|
||||
|
||||
// represents a worker script and can have many threads assigned to it
|
||||
@@ -26,15 +25,12 @@ type worker struct {
|
||||
}
|
||||
|
||||
var (
|
||||
workers []*worker
|
||||
watcherIsEnabled bool
|
||||
workers []*worker
|
||||
)
|
||||
|
||||
func initWorkers(opt []workerOpt) error {
|
||||
workers = make([]*worker, 0, len(opt))
|
||||
workersReady := sync.WaitGroup{}
|
||||
directoriesToWatch := getDirectoriesToWatch(opt)
|
||||
watcherIsEnabled = len(directoriesToWatch) > 0
|
||||
|
||||
for _, o := range opt {
|
||||
w, err := newWorker(o)
|
||||
@@ -64,15 +60,6 @@ func initWorkers(opt []workerOpt) error {
|
||||
|
||||
workersReady.Wait()
|
||||
|
||||
if !watcherIsEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
watcherIsEnabled = true
|
||||
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -144,36 +131,38 @@ func DrainWorkers() {
|
||||
func drainWorkerThreads() []*phpThread {
|
||||
ready := sync.WaitGroup{}
|
||||
drainedThreads := make([]*phpThread, 0)
|
||||
threadsToDrain := make([]*phpThread, 0)
|
||||
for _, worker := range workers {
|
||||
worker.threadMutex.RLock()
|
||||
ready.Add(len(worker.threads))
|
||||
for _, thread := range worker.threads {
|
||||
if !thread.state.requestSafeStateChange(stateRestarting) {
|
||||
ready.Done()
|
||||
// no state change allowed == thread is shutting down
|
||||
// we'll proceed to restart all other threads anyways
|
||||
continue
|
||||
}
|
||||
close(thread.drainChan)
|
||||
drainedThreads = append(drainedThreads, thread)
|
||||
go func(thread *phpThread) {
|
||||
thread.state.waitFor(stateYielding)
|
||||
ready.Done()
|
||||
}(thread)
|
||||
}
|
||||
threadsToDrain = append(threadsToDrain, worker.threads...)
|
||||
worker.threadMutex.RUnlock()
|
||||
}
|
||||
|
||||
for _, taskWorker := range taskWorkers {
|
||||
taskWorker.threadMutex.RLock()
|
||||
threadsToDrain = append(threadsToDrain, taskWorker.threads...)
|
||||
taskWorker.threadMutex.RUnlock()
|
||||
}
|
||||
|
||||
for _, thread := range threadsToDrain {
|
||||
if !thread.state.requestSafeStateChange(stateRestarting) {
|
||||
// no state change allowed == thread is shutting down
|
||||
// we'll proceed to restart all other threads anyways
|
||||
continue
|
||||
}
|
||||
ready.Add(1)
|
||||
close(thread.drainChan)
|
||||
drainedThreads = append(drainedThreads, thread)
|
||||
go func(thread *phpThread) {
|
||||
thread.state.waitFor(stateYielding)
|
||||
ready.Done()
|
||||
}(thread)
|
||||
}
|
||||
ready.Wait()
|
||||
|
||||
return drainedThreads
|
||||
}
|
||||
|
||||
func drainWatcher() {
|
||||
if watcherIsEnabled {
|
||||
watcher.DrainWatcher()
|
||||
}
|
||||
}
|
||||
|
||||
// RestartWorkers attempts to restart all workers gracefully
|
||||
func RestartWorkers() {
|
||||
// disallow scaling threads while restarting workers
|
||||
@@ -188,14 +177,6 @@ func RestartWorkers() {
|
||||
}
|
||||
}
|
||||
|
||||
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
|
||||
directoriesToWatch := []string{}
|
||||
for _, w := range workerOpts {
|
||||
directoriesToWatch = append(directoriesToWatch, w.watch...)
|
||||
}
|
||||
return directoriesToWatch
|
||||
}
|
||||
|
||||
func (worker *worker) attachThread(thread *phpThread) {
|
||||
worker.threadMutex.Lock()
|
||||
worker.threads = append(worker.threads, thread)
|
||||
@@ -213,14 +194,6 @@ func (worker *worker) detachThread(thread *phpThread) {
|
||||
worker.threadMutex.Unlock()
|
||||
}
|
||||
|
||||
func (worker *worker) countThreads() int {
|
||||
worker.threadMutex.RLock()
|
||||
l := len(worker.threads)
|
||||
worker.threadMutex.RUnlock()
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func (worker *worker) handleRequest(fc *frankenPHPContext) {
|
||||
metrics.StartWorkerRequest(worker.name)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user