Compare commits

...

50 Commits

Author SHA1 Message Date
Alliballibaba
fd40e62cb8 cleanup. 2025-10-26 21:25:19 +01:00
Alliballibaba
acf423f9f0 adds comments. 2025-10-26 20:57:00 +01:00
Alliballibaba
02c27fc2a9 Fixes library tests. 2025-10-26 20:56:08 +01:00
Alliballibaba
d54f736db7 simplifications 2025-10-26 20:52:19 +01:00
Alliballibaba
694b6188c0 Formatting. 2025-10-26 20:39:31 +01:00
Alliballibaba
b275cd58f8 Merge branch 'main' into feat/task-threads
# Conflicts:
#	types_test.go
2025-10-26 20:24:42 +01:00
Alliballibaba
3f63a4d137 Combines frankenphp_handle_task() and frankenphp_handle_request(). 2025-10-26 20:21:19 +01:00
Alliballibaba
e801a49f3f Prevents test race condition. 2025-10-11 23:36:57 +02:00
Alliballibaba
12b6aaeac6 Prevents test race condition. 2025-10-11 23:36:48 +02:00
Alliballibaba
a5a9351020 Adds sleep back in. 2025-10-11 23:29:13 +02:00
Alliballibaba
03d886d32e Removes more code. 2025-10-11 23:23:58 +02:00
Alliballibaba
8144a06ebb clang-format. 2025-10-11 22:55:31 +02:00
Alliballibaba
8a5d489c85 Returns error messages directly to PHP. 2025-10-11 22:53:44 +02:00
Alliballibaba
6b9c236d9a Removes docs (still experimental) 2025-10-11 22:41:15 +02:00
Alliballibaba
02a3b3f56d Fixes build error. 2025-10-11 22:38:51 +02:00
Alliballibaba
117b415e8b Merge branch 'main' into feat/task-threads 2025-10-11 22:37:20 +02:00
Alliballibaba
7565628516 Allows setting queue len. 2025-10-11 22:21:02 +02:00
Alliballibaba
05bf065a1b Uses goValue and phpValue for task dispatching. 2025-10-11 22:13:03 +02:00
Alliballibaba
58d1761fe8 Simplifies by removing args. 2025-10-11 21:11:14 +02:00
Alliballibaba
b23f3f833e Foxes pinning. 2025-10-07 22:20:27 +02:00
Alliballibaba
268d294313 Fixes pinning. 2025-10-07 21:29:29 +02:00
Alliballibaba
77fec2b4a7 Adds docs. 2025-10-07 21:23:40 +02:00
Alliballibaba
df7e77d3a6 Adjusts naming. 2025-10-07 21:08:16 +02:00
Alliballibaba
0dff2a2790 Adjusts naming. 2025-10-07 21:04:32 +02:00
Alliballibaba
0c0a0cb19b Adds more tests. 2025-10-07 20:39:54 +02:00
Alliballibaba
99bb21f646 Allows setting args with task-workers. 2025-10-05 11:33:19 +02:00
Alliballibaba
b8addd76e7 Merge branch 'main' into feat/task-threads 2025-10-01 22:56:25 +02:00
Alliballibaba
639817ebef Merge branch 'main' into feat/task-threads
# Conflicts:
#	worker.go
2025-09-22 21:06:16 +02:00
Alliballibaba
9c36ed4624 Merge branch 'main' into feat/task-threads 2025-09-20 22:28:10 +02:00
Alliballibaba
83c7a88ec7 Cleanup. 2025-09-20 22:27:30 +02:00
Alliballibaba
c16665ae78 Allows direct execution on tasks and correctly frees in types_test. 2025-09-18 23:35:26 +02:00
Alliballibaba
0d43efff35 Adjusts name. 2025-09-18 22:35:57 +02:00
Alliballibaba
eb2b575cbc Fixes thread attaching. 2025-09-18 22:35:08 +02:00
Alliballibaba
f5e6a045b9 Fixes small issues. 2025-09-18 22:28:54 +02:00
Alliballibaba
7a2bb89c9b Waits briefly to ensure logs are flushed 2025-09-18 22:09:18 +02:00
Alliballibaba
2297616552 Adjusts queue len. 2025-09-18 21:53:14 +02:00
Alliballibaba
7982b3af59 Adds max queue len and more tests. 2025-09-18 21:50:01 +02:00
Alliballibaba
6c3e1d6af6 Fixes race. 2025-09-17 23:04:37 +02:00
Alliballibaba
2387a9d74c Allows prepared env. 2025-09-17 22:53:03 +02:00
Alliballibaba
7438edd676 Adds direct dispatching test. 2025-09-17 22:39:59 +02:00
Alliballibaba
9c4cf7e2d8 Throws on task handling failure. 2025-09-17 22:10:39 +02:00
Alliballibaba
7f52e2d116 Allows watching with threads. 2025-09-17 21:49:10 +02:00
Alliballibaba
f43c8bb1bf Removes log. 2025-09-16 20:50:51 +02:00
Alliballibaba
6e79380ddc formatting 2025-09-16 18:25:07 +02:00
Alliballibaba
a102da8171 Adds tests and optimizations. 2025-09-16 18:24:33 +02:00
Alliballibaba
a6999209d3 test 2025-09-15 00:21:58 +02:00
Alliballibaba
65e11372c1 test 2025-09-14 23:50:48 +02:00
Alliballibaba
8ad2351abc FOrmatting. 2025-09-14 23:46:53 +02:00
Alliballibaba
abdb279bd3 Initial testing. 2025-09-14 23:45:40 +02:00
Alliballibaba
d852f1cd4b test123 2025-09-09 22:26:47 +02:00
23 changed files with 690 additions and 168 deletions

View File

@@ -33,6 +33,8 @@ type FrankenPHPApp struct {
MaxThreads int `json:"max_threads,omitempty"`
// Workers configures the worker scripts to start.
Workers []workerConfig `json:"workers,omitempty"`
// TaskWorkers configures the task worker scripts to start.
TaskWorkers []workerConfig `json:"task_workers,omitempty"`
// Overwrites the default php ini configuration
PhpIni map[string]string `json:"php_ini,omitempty"`
// The maximum amount of time a request may be stalled waiting for a thread
@@ -127,6 +129,15 @@ func (f *FrankenPHPApp) Start() error {
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...))
}
for _, tw := range f.TaskWorkers {
workerOpts := []frankenphp.WorkerOption{
frankenphp.WithWorkerEnv(tw.Env),
frankenphp.WithWorkerWatchMode(tw.Watch),
frankenphp.AsTaskWorker(true, 0), // TODO: maxQueueLen configurable here?
}
opts = append(opts, frankenphp.WithWorkers(tw.Name, repl.ReplaceKnown(tw.FileName, ""), tw.Num, workerOpts...))
}
frankenphp.Shutdown()
if err := frankenphp.Init(opts...); err != nil {
@@ -234,6 +245,13 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}
}
case "task_worker":
twc, err := parseWorkerConfig(d)
if err != nil {
return err
}
f.TaskWorkers = append(f.TaskWorkers, twc)
case "worker":
wc, err := parseWorkerConfig(d)
if err != nil {

View File

@@ -1423,3 +1423,31 @@ func TestWorkerMatchDirectiveWithoutFileServer(t *testing.T) {
// the request should completely fall through the php_server module
tester.AssertGetResponse("http://localhost:"+testPort+"/static.txt", http.StatusNotFound, "Request falls through")
}
func TestServerWithTaskWorker(t *testing.T) {
tester := caddytest.NewTester(t)
taskWorker, err := fastabs.FastAbs("../testdata/tasks/task-worker.php")
require.NoError(t, err)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
frankenphp {
num_threads 2
task_worker `+taskWorker+` {
num 1
}
}
}
`, "caddyfile")
debugState := getDebugState(t, tester)
require.Len(t, debugState.ThreadDebugStates, 2, "there should be 3 threads")
require.Equal(
t,
debugState.ThreadDebugStates[1].Name,
"Task Worker PHP Thread - "+taskWorker,
"the second spawned thread should be the task worker",
)
}

6
cgi.go
View File

@@ -277,13 +277,13 @@ func splitPos(path string, splitPath []string) int {
// See: https://github.com/php/php-src/blob/345e04b619c3bc11ea17ee02cdecad6ae8ce5891/main/SAPI.h#L72
//
//export go_update_request_info
func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) C.bool {
func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info) {
thread := phpThreads[threadIndex]
fc := thread.getRequestContext()
request := fc.request
if request == nil {
return C.bool(fc.worker != nil)
return
}
authUser, authPassword, ok := request.BasicAuth()
@@ -311,8 +311,6 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info)
info.request_uri = thread.pinCString(request.URL.RequestURI())
info.proto_num = C.int(request.ProtoMajor*1000 + request.ProtoMinor)
return C.bool(fc.worker != nil)
}
// SanitizedPathJoin performs filepath.Join(root, reqPath) that

View File

@@ -73,6 +73,7 @@ frankenphp_config frankenphp_get_config() {
bool should_filter_var = 0;
__thread uintptr_t thread_index;
__thread bool is_worker_thread = false;
__thread bool is_task_worker_thread = false;
__thread zval *os_environment = NULL;
static void frankenphp_update_request_context() {
@@ -82,7 +83,12 @@ static void frankenphp_update_request_context() {
/* status It is not reset by zend engine, set it to 200. */
SG(sapi_headers).http_response_code = 200;
is_worker_thread = go_update_request_info(thread_index, &SG(request_info));
go_update_request_info(thread_index, &SG(request_info));
}
void frankenphp_update_thread_context(bool is_worker, bool is_task_worker) {
is_worker_thread = is_worker;
is_task_worker_thread = is_task_worker;
}
static void frankenphp_free_request_context() {
@@ -411,6 +417,49 @@ PHP_FUNCTION(frankenphp_response_headers) /* {{{ */
}
/* }}} */
/* Handle a message in task worker mode */
static bool frankenphp_handle_message(zend_fcall_info fci,
zend_fcall_info_cache fcc) {
zval *arg = go_frankenphp_worker_handle_task(thread_index);
if (arg == NULL) {
return false;
}
/* Call the PHP func passed to frankenphp_handle_request() */
zval retval = {0};
fci.size = sizeof fci;
fci.retval = &retval;
fci.params = arg;
fci.param_count = 1;
zend_bool status = zend_call_function(&fci, &fcc) == SUCCESS;
if (!status || Z_TYPE(retval) == IS_UNDEF) {
go_frankenphp_finish_task(thread_index, NULL);
zval_ptr_dtor(arg);
} else {
go_frankenphp_finish_task(thread_index, &retval);
}
zval_ptr_dtor(&retval);
/*
* If an exception occurred, print the message to the client before
* exiting
*/
if (EG(exception) && !zend_is_unwind_exit(EG(exception)) &&
!zend_is_graceful_exit(EG(exception))) {
zend_exception_error(EG(exception), E_ERROR);
zend_bailout();
}
zend_try { php_output_end_all(); }
zend_end_try();
zval_ptr_dtor(arg);
return true;
}
PHP_FUNCTION(frankenphp_handle_request) {
zend_fcall_info fci;
zend_fcall_info_cache fcc;
@@ -420,6 +469,13 @@ PHP_FUNCTION(frankenphp_handle_request) {
ZEND_PARSE_PARAMETERS_END();
if (!is_worker_thread) {
/* thread is a task worker
* handle the message and do not reset globals */
if (is_task_worker_thread) {
bool keep_running = frankenphp_handle_message(fci, fcc);
RETURN_BOOL(keep_running);
}
/* not a worker, throw an error */
zend_throw_exception(
spl_ce_RuntimeException,
@@ -484,6 +540,25 @@ PHP_FUNCTION(frankenphp_handle_request) {
RETURN_TRUE;
}
PHP_FUNCTION(frankenphp_send_request) {
zval *zv;
char *worker_name = NULL;
size_t worker_name_len = 0;
ZEND_PARSE_PARAMETERS_START(1, 2);
Z_PARAM_ZVAL(zv);
Z_PARAM_OPTIONAL
Z_PARAM_STRING(worker_name, worker_name_len);
ZEND_PARSE_PARAMETERS_END();
char *error = go_frankenphp_send_request(thread_index, zv, worker_name,
worker_name_len);
if (error) {
zend_throw_exception(spl_ce_RuntimeException, error, 0);
RETURN_THROWS();
}
}
PHP_FUNCTION(headers_send) {
zend_long response_code = 200;

View File

@@ -37,6 +37,8 @@ import (
"unsafe"
// debug on Linux
//_ "github.com/ianlancetaylor/cgosymbolizer"
"github.com/dunglas/frankenphp/internal/watcher"
)
type contextKeyStruct struct{}
@@ -52,7 +54,8 @@ var (
ErrScriptExecution = errors.New("error during PHP script execution")
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
isRunning bool
isRunning bool
watcherIsEnabled bool
loggerMu sync.RWMutex
logger *slog.Logger
@@ -151,6 +154,10 @@ func calculateMaxThreads(opt *opt) (int, int, int, error) {
numWorkers += opt.workers[i].num
}
for _, tw := range opt.taskWorkers {
numWorkers += tw.num
}
numThreadsIsSet := opt.numThreads > 0
maxThreadsIsSet := opt.maxThreads != 0
maxThreadsIsAuto := opt.maxThreads < 0 // maxthreads < 0 signifies auto mode (see phpmaintread.go)
@@ -279,10 +286,23 @@ func Init(options ...Option) error {
convertToRegularThread(getInactivePHPThread())
}
directoriesToWatch := getDirectoriesToWatch(append(opt.workers, opt.taskWorkers...))
watcherIsEnabled = len(directoriesToWatch) > 0 // watcherIsEnabled needs to be set before initWorkers()
if err := initWorkers(opt.workers); err != nil {
return err
}
if err := initTaskWorkers(opt.taskWorkers); err != nil {
return err
}
if watcherIsEnabled {
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
return err
}
}
initAutoScaling(mainThread)
ctx := context.Background()
@@ -300,8 +320,11 @@ func Shutdown() {
return
}
drainWatcher()
if watcherIsEnabled {
watcher.DrainWatcher()
}
drainAutoScaling()
drainTaskWorkers()
drainPHPThreads()
metrics.Shutdown()
@@ -629,3 +652,12 @@ func timeoutChan(timeout time.Duration) <-chan time.Time {
return time.After(timeout)
}
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
directoriesToWatch := []string{}
for _, w := range workerOpts {
directoriesToWatch = append(directoriesToWatch, w.watch...)
}
return directoriesToWatch
}

View File

@@ -86,4 +86,6 @@ void frankenphp_register_bulk(
void register_extensions(zend_module_entry *m, int len);
void frankenphp_update_thread_context(bool is_worker, bool is_task_worker);
#endif

View File

@@ -4,6 +4,8 @@
function frankenphp_handle_request(callable $callback): bool {}
function frankenphp_send_request(mixed $task, string $workerName = ''): void {}
function headers_send(int $status = 200): int {}
function frankenphp_finish_request(): bool {}

View File

@@ -6,6 +6,12 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1,
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_send_request, 0, 1,
IS_VOID, 0)
ZEND_ARG_TYPE_INFO(0, task, IS_MIXED, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, worker_name, IS_STRING, 0, "\"\"")
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200")
ZEND_END_ARG_INFO()
@@ -31,6 +37,7 @@ ZEND_END_ARG_INFO()
#define arginfo_apache_response_headers arginfo_frankenphp_response_headers
ZEND_FUNCTION(frankenphp_handle_request);
ZEND_FUNCTION(frankenphp_send_request);
ZEND_FUNCTION(headers_send);
ZEND_FUNCTION(frankenphp_finish_request);
ZEND_FUNCTION(frankenphp_request_headers);
@@ -39,6 +46,7 @@ ZEND_FUNCTION(frankenphp_response_headers);
// clang-format off
static const zend_function_entry ext_functions[] = {
ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request)
ZEND_FE(frankenphp_send_request, arginfo_frankenphp_send_request)
ZEND_FE(headers_send, arginfo_headers_send)
ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request)
ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request)

View File

@@ -22,6 +22,7 @@ type opt struct {
numThreads int
maxThreads int
workers []workerOpt
taskWorkers []workerOpt
logger *slog.Logger
metrics Metrics
phpIni map[string]string
@@ -35,6 +36,8 @@ type workerOpt struct {
env PreparedEnv
watch []string
maxConsecutiveFailures int
isTaskWorker bool
maxQueueLen int
}
// WithNumThreads configures the number of PHP threads to start.
@@ -80,7 +83,11 @@ func WithWorkers(name string, fileName string, num int, options ...WorkerOption)
}
}
o.workers = append(o.workers, worker)
if worker.isTaskWorker {
o.taskWorkers = append(o.taskWorkers, worker)
} else {
o.workers = append(o.workers, worker)
}
return nil
}
@@ -141,3 +148,14 @@ func WithMaxWaitTime(maxWaitTime time.Duration) Option {
return nil
}
}
// EXPERIMENTAL: AsTaskWorker configures the worker as a task worker.
// no http requests will be handled.
// no globals resetting will be performed between tasks.
func AsTaskWorker(isTaskWorker bool, maxQueueLen int) WorkerOption {
return func(w *workerOpt) error {
w.isTaskWorker = isTaskWorker
w.maxQueueLen = maxQueueLen
return nil
}
}

View File

@@ -132,6 +132,10 @@ func (thread *phpThread) pinCString(s string) *C.char {
return thread.pinString(s + "\x00")
}
func (thread *phpThread) updateContext(isWorker bool, isTaskWorker bool) {
C.frankenphp_update_thread_context(C.bool(isWorker), C.bool(isTaskWorker))
}
//export go_frankenphp_before_script_execution
func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char {
thread := phpThreads[threadIndex]

View File

@@ -0,0 +1,16 @@
<?php
require_once __DIR__ . '/../_executor.php';
return function () {
$taskCount = $_GET['count'] ?? 0;
$workerName = $_GET['worker'] ?? '';
for ($i = 0; $i < $taskCount; $i++) {
frankenphp_send_request([
'task' => "array task$i",
'worker' => $workerName,
'index' => $i,
]);
}
echo "dispatched $taskCount tasks\n";
};

View File

@@ -0,0 +1,12 @@
<?php
require_once __DIR__.'/../_executor.php';
return function () {
$taskCount = $_GET['count'] ?? 0;
$workerName = $_GET['worker'] ?? '';
for ($i = 0; $i < $taskCount; $i++) {
frankenphp_send_request("task$i", $workerName);
}
echo "dispatched $taskCount tasks\n";
};

14
testdata/tasks/task-worker.php vendored Normal file
View File

@@ -0,0 +1,14 @@
<?php
$handleFunc = function ($task) {
var_dump($task);
echo $_SERVER['CUSTOM_VAR'] ?? 'no custom var';
return "task completed: ".$task;
};
$maxTasksBeforeRestarting = 1000;
$currentTask = 0;
while(frankenphp_handle_request($handleFunc) && $currentTask++ < $maxTasksBeforeRestarting) {
// Keep handling tasks until there are no more tasks or the max limit is reached
}

View File

@@ -35,6 +35,7 @@ func (handler *regularThread) beforeScriptExecution() string {
return handler.thread.transitionToNewHandler()
case stateTransitionComplete:
handler.state.set(stateReady)
handler.thread.updateContext(false, false)
return handler.waitForRequest()
case stateReady:
return handler.waitForRequest()

View File

@@ -1,90 +0,0 @@
package frankenphp
import (
"sync"
)
// representation of a thread that handles tasks directly assigned by go
// implements the threadHandler interface
type taskThread struct {
thread *phpThread
execChan chan *task
}
// task callbacks will be executed directly on the PHP thread
// therefore having full access to the PHP runtime
type task struct {
callback func()
done sync.Mutex
}
func newTask(cb func()) *task {
t := &task{callback: cb}
t.done.Lock()
return t
}
func (t *task) waitForCompletion() {
t.done.Lock()
}
func convertToTaskThread(thread *phpThread) *taskThread {
handler := &taskThread{
thread: thread,
execChan: make(chan *task),
}
thread.setHandler(handler)
return handler
}
func (handler *taskThread) beforeScriptExecution() string {
thread := handler.thread
switch thread.state.get() {
case stateTransitionRequested:
return thread.transitionToNewHandler()
case stateBooting, stateTransitionComplete:
thread.state.set(stateReady)
handler.waitForTasks()
return handler.beforeScriptExecution()
case stateReady:
handler.waitForTasks()
return handler.beforeScriptExecution()
case stateShuttingDown:
// signal to stop
return ""
}
panic("unexpected state: " + thread.state.name())
}
func (handler *taskThread) afterScriptExecution(int) {
panic("task threads should not execute scripts")
}
func (handler *taskThread) getRequestContext() *frankenPHPContext {
return nil
}
func (handler *taskThread) name() string {
return "Task PHP Thread"
}
func (handler *taskThread) waitForTasks() {
for {
select {
case task := <-handler.execChan:
task.callback()
task.done.Unlock() // unlock the task to signal completion
case <-handler.thread.drainChan:
// thread is shutting down, do not execute the function
return
}
}
}
func (handler *taskThread) execute(t *task) {
handler.execChan <- t
}

280
threadtaskworker.go Normal file
View File

@@ -0,0 +1,280 @@
package frankenphp
// #include "frankenphp.h"
// #include <php_variables.h>
import "C"
import (
"errors"
"github.com/dunglas/frankenphp/internal/fastabs"
"sync"
"unsafe"
)
type taskWorker struct {
threads []*phpThread
threadMutex sync.RWMutex
fileName string
taskChan chan *pendingTask
name string
num int
env PreparedEnv
}
// representation of a thread that handles tasks directly assigned by go or via frankenphp_send_request()
// can also just execute a script in a loop
// implements the threadHandler interface
type taskWorkerThread struct {
thread *phpThread
taskWorker *taskWorker
dummyContext *frankenPHPContext
currentTask *pendingTask
}
var taskWorkers []*taskWorker
// EXPERIMENTAL: a task dispatched to a task worker
type pendingTask struct {
message any // the argument passed to frankenphp_send_request() or the return value of frankenphp_handle_request()
done sync.RWMutex
callback func() // optional callback for direct execution (tests)
}
func initTaskWorkers(opts []workerOpt) error {
taskWorkers = make([]*taskWorker, 0, len(opts))
ready := sync.WaitGroup{}
for _, opt := range opts {
fileName, err := fastabs.FastAbs(opt.fileName)
if err != nil {
return err
}
if opt.maxQueueLen <= 0 {
opt.maxQueueLen = 10000 // default queue len, TODO: unlimited?
}
tw := &taskWorker{
threads: make([]*phpThread, 0, opt.num),
fileName: fileName,
taskChan: make(chan *pendingTask, opt.maxQueueLen),
name: opt.name,
num: opt.num,
env: opt.env,
}
taskWorkers = append(taskWorkers, tw)
// start the actual PHP threads
ready.Add(tw.num)
for i := 0; i < tw.num; i++ {
thread := getInactivePHPThread()
convertToTaskWorkerThread(thread, tw)
go func(thread *phpThread) {
thread.state.waitFor(stateReady)
ready.Done()
}(thread)
}
}
ready.Wait()
return nil
}
func drainTaskWorkers() {
for _, tw := range taskWorkers {
tw.drainQueue()
}
}
func convertToTaskWorkerThread(thread *phpThread, tw *taskWorker) *taskWorkerThread {
handler := &taskWorkerThread{
thread: thread,
taskWorker: tw,
}
thread.setHandler(handler)
return handler
}
func (handler *taskWorkerThread) beforeScriptExecution() string {
thread := handler.thread
switch thread.state.get() {
case stateTransitionRequested:
handler.taskWorker.detach(thread)
return thread.transitionToNewHandler()
case stateBooting, stateTransitionComplete:
tw := handler.taskWorker
tw.threadMutex.Lock()
tw.threads = append(tw.threads, thread)
tw.threadMutex.Unlock()
thread.state.set(stateReady)
thread.updateContext(false, true)
return handler.setupWorkerScript()
case stateReady:
return handler.setupWorkerScript()
case stateRestarting:
thread.state.set(stateYielding)
thread.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateShuttingDown:
handler.taskWorker.detach(thread)
// signal to stop
return ""
}
panic("unexpected state: " + thread.state.name())
}
func (handler *taskWorkerThread) setupWorkerScript() string {
fc, err := newDummyContext(handler.taskWorker.fileName, WithRequestPreparedEnv(handler.taskWorker.env))
if err != nil {
panic(err)
}
handler.dummyContext = fc
clearSandboxedEnv(handler.thread)
return handler.taskWorker.fileName
}
func (handler *taskWorkerThread) afterScriptExecution(int) {
// restart the script
}
func (handler *taskWorkerThread) getRequestContext() *frankenPHPContext {
return handler.dummyContext
}
func (handler *taskWorkerThread) name() string {
return "Task Worker PHP Thread - " + handler.taskWorker.fileName
}
func (tw *taskWorker) detach(thread *phpThread) {
tw.threadMutex.Lock()
defer tw.threadMutex.Unlock()
for i, t := range tw.threads {
if t == thread {
tw.threads = append(tw.threads[:i], tw.threads[i+1:]...)
return
}
}
}
// make sure all tasks are done by re-queuing them until the channel is empty
func (tw *taskWorker) drainQueue() {
for {
select {
case pt := <-tw.taskChan:
tw.taskChan <- pt
pt.done.RLock() // wait for completion
default:
return
}
}
}
func (tw *taskWorker) dispatch(t *pendingTask) error {
t.done.Lock()
select {
case tw.taskChan <- t:
return nil
default:
return errors.New("Task worker queue is full, cannot dispatch task: " + tw.name)
}
}
func getTaskWorkerByName(name string) *taskWorker {
for _, w := range taskWorkers {
if w.name == name {
return w
}
}
return nil
}
//export go_frankenphp_worker_handle_task
func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) *C.zval {
thread := phpThreads[threadIndex]
handler, _ := thread.handler.(*taskWorkerThread)
thread.Unpin()
thread.state.markAsWaiting(true)
select {
case task := <-handler.taskWorker.taskChan:
handler.currentTask = task
thread.state.markAsWaiting(false)
// if the task has a callback, execute it (see types_test.go)
if task.callback != nil {
task.callback()
go_frankenphp_finish_task(threadIndex, nil)
return go_frankenphp_worker_handle_task(threadIndex)
}
zval := phpValue(task.message)
task.message = nil // free memory
thread.Pin(unsafe.Pointer(zval)) // TODO: refactor types.go so no pinning is required
return zval
case <-handler.thread.drainChan:
thread.state.markAsWaiting(false)
// send an empty task to drain the thread
return nil
}
}
//export go_frankenphp_finish_task
func go_frankenphp_finish_task(threadIndex C.uintptr_t, zv *C.zval) {
thread := phpThreads[threadIndex]
handler, ok := thread.handler.(*taskWorkerThread)
if !ok {
panic("thread is not a task thread: " + thread.handler.name())
}
if zv != nil {
result, err := goValue[any](zv)
if err != nil {
panic("failed to convert go_frankenphp_finish_task() return value: " + err.Error())
}
handler.currentTask.message = result
}
handler.currentTask.done.Unlock()
handler.currentTask = nil
}
//export go_frankenphp_send_request
func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.char, nameLen C.size_t) *C.char {
if zv == nil {
return phpThreads[threadIndex].pinCString("Task argument cannot be null")
}
var tw *taskWorker
if nameLen != 0 {
tw = getTaskWorkerByName(C.GoStringN(name, C.int(nameLen)))
} else if len(taskWorkers) != 0 {
tw = taskWorkers[0]
}
if tw == nil {
return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen)))
}
// convert the argument of frankenphp_send_request() to a Go value
goArg, err := goValue[any](zv)
if err != nil {
return phpThreads[threadIndex].pinCString("Failed to convert frankenphp_send_request() argument: " + err.Error())
}
err = tw.dispatch(&pendingTask{message: goArg})
if err != nil {
return phpThreads[threadIndex].pinCString(err.Error())
}
return nil
}

89
threadtaskworker_test.go Normal file
View File

@@ -0,0 +1,89 @@
package frankenphp
import (
"bytes"
"log/slog"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func assertGetRequest(t *testing.T, url string, expectedBodyContains string, opts ...RequestOption) {
t.Helper()
r := httptest.NewRequest("GET", url, nil)
w := httptest.NewRecorder()
req, err := NewRequestWithContext(r, opts...)
assert.NoError(t, err)
assert.NoError(t, ServeHTTP(w, req))
assert.Contains(t, w.Body.String(), expectedBodyContains)
}
func TestDispatchToTaskWorkerFromWorker(t *testing.T) {
var buf bytes.Buffer
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker1", "./testdata/tasks/task-dispatcher-string.php", 1),
WithNumThreads(3), // regular thread, task worker thread, dispatcher threads
WithLogger(logger),
))
assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-string.php?count=4", "dispatched 4 tasks")
// wait and shutdown to ensure all logs are flushed
time.Sleep(10 * time.Millisecond)
Shutdown()
// task output appears in logs at info level
logOutput := buf.String()
assert.Contains(t, logOutput, "task0")
assert.Contains(t, logOutput, "task1")
assert.Contains(t, logOutput, "task2")
assert.Contains(t, logOutput, "task3")
}
func TestDispatchArrayToTaskWorker(t *testing.T) {
var buf bytes.Buffer
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker2", "./testdata/tasks/task-dispatcher-array.php", 1),
WithNumThreads(3), // regular thread, task worker thread, dispatcher thread
WithLogger(logger),
))
assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-array.php?count=1", "dispatched 1 tasks")
// wait and shutdown to ensure all logs are flushed
time.Sleep(10 * time.Millisecond)
Shutdown()
// task output appears in logs at info level
logOutput := buf.String()
assert.Contains(t, logOutput, "array task0")
}
func TestDispatchToMultipleWorkers(t *testing.T) {
var buf bytes.Buffer
handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker2", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithNumThreads(4),
WithLogger(logger),
))
defer Shutdown()
script := "http://example.com/testdata/tasks/task-dispatcher-string.php"
assertGetRequest(t, script+"?count=1&worker=worker1", "dispatched 1 tasks")
assertGetRequest(t, script+"?count=1&worker=worker2", "dispatched 1 tasks")
assertGetRequest(t, script+"?count=1&worker=worker3", "No worker found to handle this task") // fail
}

View File

@@ -62,6 +62,7 @@ func (handler *workerThread) beforeScriptExecution() string {
if handler.externalWorker != nil {
handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex)
}
handler.thread.updateContext(true, false)
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:

View File

@@ -31,6 +31,8 @@ void __zval_double__(zval *zv, double val) { ZVAL_DOUBLE(zv, val); }
void __zval_string__(zval *zv, zend_string *str) { ZVAL_STR(zv, str); }
void __zval_empty_string__(zval *zv) { ZVAL_EMPTY_STRING(zv); }
void __zval_arr__(zval *zv, zend_array *arr) { ZVAL_ARR(zv, arr); }
zend_array *__zend_new_array__(uint32_t size) { return zend_new_array(size); }

View File

@@ -263,7 +263,6 @@ func goValue[T any](zval *C.zval) (res T, err error) {
resZero T
)
t := C.zval_get_type(zval)
switch t {
case C.IS_NULL:
resAny = any(nil)
@@ -382,6 +381,10 @@ func phpValue(value any) *C.zval {
case float64:
C.__zval_double__(&zval, C.double(v))
case string:
if v == "" {
C.__zval_empty_string__(&zval)
break
}
str := (*C.zend_string)(PHPString(v, false))
C.__zval_string__(&zval, str)
case map[string]any:
@@ -435,3 +438,13 @@ func extractZvalValue(zval *C.zval, expectedType C.uint8_t) (unsafe.Pointer, err
return nil, fmt.Errorf("unsupported zval type %d", expectedType)
}
// used for cleanup in tests
func zvalPtrDtor(p unsafe.Pointer) {
C.zval_ptr_dtor((*C.zval)(p))
}
// used for cleanup in tests
func zendStringRelease(p unsafe.Pointer) {
C.zend_string_release((*C.zend_string)(p))
}

View File

@@ -19,6 +19,7 @@ void __zval_bool__(zval *zv, bool val);
void __zval_long__(zval *zv, zend_long val);
void __zval_double__(zval *zv, double val);
void __zval_string__(zval *zv, zend_string *str);
void __zval_empty_string__(zval *zv);
void __zval_arr__(zval *zv, zend_array *arr);
zend_array *__zend_new_array__(uint32_t size);

View File

@@ -1,37 +1,50 @@
package frankenphp
import (
"io"
"errors"
"log/slog"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/exp/zapslog"
"go.uber.org/zap/zaptest"
)
// execute the function on a PHP thread directly
// this is necessary if tests make use of PHP's internal allocation
func testOnDummyPHPThread(t *testing.T, test func()) {
func testOnDummyPHPThread(t *testing.T, cb func()) {
t.Helper()
logger = slog.New(slog.NewTextHandler(io.Discard, nil))
_, err := initPHPThreads(1, 1, nil) // boot 1 thread
assert.NoError(t, err)
handler := convertToTaskThread(phpThreads[0])
logger = slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))
assert.NoError(t, Init(
WithWorkers("tw", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithNumThreads(2),
WithLogger(logger),
))
defer Shutdown()
task := newTask(test)
handler.execute(task)
task.waitForCompletion()
assert.NoError(t, executeOnPHPThread(cb, "tw"))
}
drainPHPThreads()
// executeOnPHPThread executes the callback func() directly on a task worker thread
// useful for testing purposes when dealing with PHP allocations
func executeOnPHPThread(callback func(), taskWorkerName string) error {
tw := getTaskWorkerByName(taskWorkerName)
if tw == nil {
return errors.New("no task worker found with name " + taskWorkerName)
}
return tw.dispatch(&pendingTask{callback: callback})
}
func TestGoString(t *testing.T) {
testOnDummyPHPThread(t, func() {
originalString := "Hello, World!"
convertedString := GoString(PHPString(originalString, false))
phpString := PHPString(originalString, false)
defer zendStringRelease(phpString)
assert.Equal(t, originalString, convertedString, "string -> zend_string -> string should yield an equal string")
assert.Equal(t, originalString, GoString(phpString), "string -> zend_string -> string should yield an equal string")
})
}
@@ -42,7 +55,9 @@ func TestPHPMap(t *testing.T) {
"foo2": "bar2",
}
convertedMap, err := GoMap[string](PHPMap(originalMap))
phpArray := PHPMap(originalMap)
defer zvalPtrDtor(phpArray)
convertedMap, err := GoMap[string](phpArray)
require.NoError(t, err)
assert.Equal(t, originalMap, convertedMap, "associative array should be equal after conversion")
@@ -59,7 +74,9 @@ func TestOrderedPHPAssociativeArray(t *testing.T) {
Order: []string{"foo2", "foo1"},
}
convertedArray, err := GoAssociativeArray[string](PHPAssociativeArray(originalArray))
phpArray := PHPAssociativeArray(originalArray)
defer zvalPtrDtor(phpArray)
convertedArray, err := GoAssociativeArray[string](phpArray)
require.NoError(t, err)
assert.Equal(t, originalArray, convertedArray, "associative array should be equal after conversion")
@@ -70,7 +87,9 @@ func TestPHPPackedArray(t *testing.T) {
testOnDummyPHPThread(t, func() {
originalSlice := []string{"bar1", "bar2"}
convertedSlice, err := GoPackedArray[string](PHPPackedArray(originalSlice))
phpArray := PHPPackedArray(originalSlice)
defer zvalPtrDtor(phpArray)
convertedSlice, err := GoPackedArray[string](phpArray)
require.NoError(t, err)
assert.Equal(t, originalSlice, convertedSlice, "slice should be equal after conversion")
@@ -85,7 +104,9 @@ func TestPHPPackedArrayToGoMap(t *testing.T) {
"1": "bar2",
}
convertedMap, err := GoMap[string](PHPPackedArray(originalSlice))
phpArray := PHPPackedArray(originalSlice)
defer zvalPtrDtor(phpArray)
convertedMap, err := GoMap[string](phpArray)
require.NoError(t, err)
assert.Equal(t, expectedMap, convertedMap, "convert a packed to an associative array")
@@ -103,7 +124,9 @@ func TestPHPAssociativeArrayToPacked(t *testing.T) {
}
expectedSlice := []string{"bar1", "bar2"}
convertedSlice, err := GoPackedArray[string](PHPAssociativeArray(originalArray))
phpArray := PHPAssociativeArray(originalArray)
defer zvalPtrDtor(phpArray)
convertedSlice, err := GoPackedArray[string](phpArray)
require.NoError(t, err)
assert.Equal(t, expectedSlice, convertedSlice, "convert an associative array to a slice")
@@ -126,7 +149,9 @@ func TestNestedMixedArray(t *testing.T) {
},
}
convertedArray, err := GoMap[any](PHPMap(originalArray))
phpArray := PHPMap(originalArray)
defer zvalPtrDtor(phpArray)
convertedArray, err := GoMap[any](phpArray)
require.NoError(t, err)
assert.Equal(t, originalArray, convertedArray, "nested mixed array should be equal after conversion")

View File

@@ -9,7 +9,6 @@ import (
"time"
"github.com/dunglas/frankenphp/internal/fastabs"
"github.com/dunglas/frankenphp/internal/watcher"
)
// represents a worker script and can have many threads assigned to it
@@ -26,15 +25,12 @@ type worker struct {
}
var (
workers []*worker
watcherIsEnabled bool
workers []*worker
)
func initWorkers(opt []workerOpt) error {
workers = make([]*worker, 0, len(opt))
workersReady := sync.WaitGroup{}
directoriesToWatch := getDirectoriesToWatch(opt)
watcherIsEnabled = len(directoriesToWatch) > 0
for _, o := range opt {
w, err := newWorker(o)
@@ -64,15 +60,6 @@ func initWorkers(opt []workerOpt) error {
workersReady.Wait()
if !watcherIsEnabled {
return nil
}
watcherIsEnabled = true
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
return err
}
return nil
}
@@ -144,36 +131,38 @@ func DrainWorkers() {
func drainWorkerThreads() []*phpThread {
ready := sync.WaitGroup{}
drainedThreads := make([]*phpThread, 0)
threadsToDrain := make([]*phpThread, 0)
for _, worker := range workers {
worker.threadMutex.RLock()
ready.Add(len(worker.threads))
for _, thread := range worker.threads {
if !thread.state.requestSafeStateChange(stateRestarting) {
ready.Done()
// no state change allowed == thread is shutting down
// we'll proceed to restart all other threads anyways
continue
}
close(thread.drainChan)
drainedThreads = append(drainedThreads, thread)
go func(thread *phpThread) {
thread.state.waitFor(stateYielding)
ready.Done()
}(thread)
}
threadsToDrain = append(threadsToDrain, worker.threads...)
worker.threadMutex.RUnlock()
}
for _, taskWorker := range taskWorkers {
taskWorker.threadMutex.RLock()
threadsToDrain = append(threadsToDrain, taskWorker.threads...)
taskWorker.threadMutex.RUnlock()
}
for _, thread := range threadsToDrain {
if !thread.state.requestSafeStateChange(stateRestarting) {
// no state change allowed == thread is shutting down
// we'll proceed to restart all other threads anyways
continue
}
ready.Add(1)
close(thread.drainChan)
drainedThreads = append(drainedThreads, thread)
go func(thread *phpThread) {
thread.state.waitFor(stateYielding)
ready.Done()
}(thread)
}
ready.Wait()
return drainedThreads
}
func drainWatcher() {
if watcherIsEnabled {
watcher.DrainWatcher()
}
}
// RestartWorkers attempts to restart all workers gracefully
func RestartWorkers() {
// disallow scaling threads while restarting workers
@@ -188,14 +177,6 @@ func RestartWorkers() {
}
}
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
directoriesToWatch := []string{}
for _, w := range workerOpts {
directoriesToWatch = append(directoriesToWatch, w.watch...)
}
return directoriesToWatch
}
func (worker *worker) attachThread(thread *phpThread) {
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
@@ -213,14 +194,6 @@ func (worker *worker) detachThread(thread *phpThread) {
worker.threadMutex.Unlock()
}
func (worker *worker) countThreads() int {
worker.threadMutex.RLock()
l := len(worker.threads)
worker.threadMutex.RUnlock()
return l
}
func (worker *worker) handleRequest(fc *frankenPHPContext) {
metrics.StartWorkerRequest(worker.name)