Throws on task handling failure.

This commit is contained in:
Alliballibaba
2025-09-17 22:10:39 +02:00
parent 7f52e2d116
commit 9c4cf7e2d8
9 changed files with 22 additions and 25 deletions

View File

@@ -133,7 +133,7 @@ func (f *FrankenPHPApp) Start() error {
workerOpts := []frankenphp.WorkerOption{
frankenphp.WithWorkerEnv(tw.Env),
frankenphp.WithWorkerWatchMode(tw.Watch),
frankenphp.WithTaskWorker(true),
frankenphp.AsTaskWorker(true),
}
opts = append(opts, frankenphp.WithWorkers(tw.Name, repl.ReplaceKnown(tw.FileName, ""), tw.Num, workerOpts...))

View File

@@ -547,13 +547,18 @@ PHP_FUNCTION(frankenphp_dispatch_task) {
/* copy the task string so other threads can use it */
char *task_copy =
pemalloc(task_len, 1); /* free in frankenphp_handle_task() */
pemalloc(task_len, 1); /* freed in frankenphp_handle_task() */
memcpy(task_copy, task_string, task_len);
go_frankenphp_worker_dispatch_task(0, task_copy, task_len, worker_name,
bool success = go_frankenphp_worker_dispatch_task(0, task_copy, task_len, worker_name,
worker_name_len);
RETURN_TRUE;
if (!success) {
pefree(task_copy, 1);
// throw
zend_throw_exception(spl_ce_RuntimeException,
"No worker found to handle the task", 0);
RETURN_THROWS();
}
}
PHP_FUNCTION(headers_send) {

View File

@@ -54,7 +54,8 @@ var (
ErrScriptExecution = errors.New("error during PHP script execution")
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
isRunning bool
isRunning bool
watcherIsEnabled bool
loggerMu sync.RWMutex
logger *slog.Logger

View File

@@ -6,7 +6,7 @@ function frankenphp_handle_request(callable $callback): bool {}
function frankenphp_handle_task(callable $callback): bool {}
function frankenphp_dispatch_task(string $task, ?string $workerName = null): bool {}
function frankenphp_dispatch_task(string $task, ?string $workerName = null) {}
function headers_send(int $status = 200): int {}

View File

@@ -12,7 +12,7 @@ ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_dispatch_task, 0, 1,
_IS_BOOL, 0)
IS_VOID, 0)
ZEND_ARG_TYPE_INFO(0, task, IS_STRING, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, worker_name, IS_STRING, 0, "null")
ZEND_END_ARG_INFO()

View File

@@ -148,8 +148,8 @@ func WithMaxWaitTime(maxWaitTime time.Duration) Option {
}
}
// EXPERMIENTAL: WithTaskWorker configures the worker as a task worker instead.
func WithTaskWorker(isTaskWorker bool) WorkerOption {
// EXPERMIENTAL: AsTaskWorker configures the worker as a task worker instead.
func AsTaskWorker(isTaskWorker bool) WorkerOption {
return func(w *workerOpt) error {
w.isTaskWorker = isTaskWorker
return nil

View File

@@ -141,15 +141,7 @@ func (handler *taskWorkerThread) name() string {
//export go_frankenphp_worker_handle_task
func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) C.go_string {
thread := phpThreads[threadIndex]
handler, ok := thread.handler.(*taskWorkerThread)
if !ok {
panic("thread is not a task thread")
}
if !thread.state.is(stateReady) {
thread.state.set(stateReady)
}
handler, _ := thread.handler.(*taskWorkerThread)
thread.state.markAsWaiting(true)
select {
@@ -180,7 +172,7 @@ func go_frankenphp_finish_task(threadIndex C.uintptr_t) {
func go_frankenphp_worker_dispatch_task(taskWorkerIndex C.uintptr_t, taskChar *C.char, taskLen C.size_t, name *C.char, nameLen C.size_t) C.bool {
var worker *taskWorker
if name != nil {
name := C.GoStringN(name, C.int(nameLen))
name := C.GoStringN(name, C.int(nameLen)) // TODO: avoid copy
for _, w := range taskWorkers {
if w.name == name {
worker = w
@@ -192,7 +184,7 @@ func go_frankenphp_worker_dispatch_task(taskWorkerIndex C.uintptr_t, taskChar *C
}
if worker == nil {
logger.Error("task worker does not exist", "name", C.GoStringN(name, C.int(nameLen)))
logger.Error("no task worker found to handle this task", "name", C.GoStringN(name, C.int(nameLen)))
return C.bool(false)
}
@@ -203,7 +195,7 @@ func go_frankenphp_worker_dispatch_task(taskWorkerIndex C.uintptr_t, taskChar *C
// dispatch immediately if available (best performance)
select {
case taskWorkers[taskWorkerIndex].taskChan <- task:
return C.bool(false)
return C.bool(true)
default:
}

View File

@@ -26,7 +26,7 @@ func TestDispatchWorkToTaskWorker(t *testing.T) {
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("worker", "testdata/tasks/task-worker.php", 1, WithTaskWorkerMode(true)),
WithWorkers("worker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true)),
WithNumThreads(3),
WithLogger(logger),
))

View File

@@ -25,8 +25,7 @@ type worker struct {
}
var (
workers []*worker
watcherIsEnabled bool
workers []*worker
)
func initWorkers(opt []workerOpt) error {