Removes more code.

This commit is contained in:
Alliballibaba
2025-10-11 23:23:58 +02:00
parent 8144a06ebb
commit 03d886d32e
5 changed files with 52 additions and 50 deletions

View File

@@ -1428,7 +1428,7 @@ func TestServerWithTaskWorker(t *testing.T) {
tester := caddytest.NewTester(t)
taskWorker1, err := fastabs.FastAbs("../testdata/tasks/task-worker.php")
require.NoError(t, err)
taskWorker2, err := fastabs.FastAbs("../testdata/tasks/task-worker2.php")
taskWorker2, err := fastabs.FastAbs("../testdata/tasks/task-worker.php")
require.NoError(t, err)
tester.InitServer(`
{

View File

@@ -1,9 +0,0 @@
<?php
$handleFunc = function ($task) {
echo "$task";
};
while(frankenphp_handle_task($handleFunc)) {
// Keep handling tasks until there are no more tasks or the max limit is reached
}

View File

@@ -45,6 +45,17 @@ func (t *PendingTask) WaitForCompletion() {
t.done.RLock()
}
func (t *PendingTask) dispatch(tw *taskWorker) error {
t.done.Lock()
select {
case tw.taskChan <- t:
return nil
default:
return errors.New("Task worker queue is full, cannot dispatch task: " + tw.name)
}
}
// EXPERIMENTAL: DispatchTask dispatches a task to a named task worker
func DispatchTask(arg any, taskWorkerName string) (*PendingTask, error) {
tw := getTaskWorkerByName(taskWorkerName)
@@ -53,27 +64,9 @@ func DispatchTask(arg any, taskWorkerName string) (*PendingTask, error) {
}
pt := &PendingTask{arg: arg}
pt.done.Lock()
err := pt.dispatch(tw)
tw.taskChan <- pt
return pt, nil
}
// EXPERIMENTAL: ExecuteCallbackOnTaskWorker executes the callback func() directly on a task worker thread
// this gives the callback access to PHP's memory management
func ExecuteCallbackOnTaskWorker(callback func(), taskWorkerName string) (*PendingTask, error) {
tw := getTaskWorkerByName(taskWorkerName)
if tw == nil {
return nil, errors.New("no task worker found with name " + taskWorkerName)
}
pt := &PendingTask{callback: callback}
pt.done.Lock()
tw.taskChan <- pt
return pt, nil
return pt, err
}
func initTaskWorkers(opts []workerOpt) error {
@@ -117,12 +110,7 @@ func initTaskWorkers(opts []workerOpt) error {
func drainTaskWorkers() {
for _, tw := range taskWorkers {
select {
// make sure all tasks are done by re-queuing them until the channel is empty
case pt := <-tw.taskChan:
tw.taskChan <- pt
default:
}
tw.drainQueue()
}
}
@@ -208,6 +196,19 @@ func (tw *taskWorker) detach(thread *phpThread) {
tw.threadMutex.Unlock()
}
// make sure all tasks are done by re-queuing them until the channel is empty
func (tw *taskWorker) drainQueue() {
for {
select {
case pt := <-tw.taskChan:
tw.taskChan <- pt
pt.WaitForCompletion()
default:
return
}
}
}
func getTaskWorkerByName(name string) *taskWorker {
for _, w := range taskWorkers {
if w.name == name {
@@ -231,7 +232,7 @@ func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) *C.zval {
thread.state.markAsWaiting(false)
// if the task has a callback, handle it directly
// callbacks may call into C (C -> GO -> C)
// currently only here for tests, TODO: is this useful for extensions?
if task.callback != nil {
task.callback()
go_frankenphp_finish_task(threadIndex, nil)
@@ -273,7 +274,7 @@ func go_frankenphp_dispatch_task(threadIndex C.uintptr_t, zv *C.zval, name *C.ch
}
var worker *taskWorker
if name != nil && nameLen != 0 {
if nameLen != 0 {
worker = getTaskWorkerByName(C.GoStringN(name, C.int(nameLen)))
} else if len(taskWorkers) != 0 {
worker = taskWorkers[0]
@@ -286,13 +287,11 @@ func go_frankenphp_dispatch_task(threadIndex C.uintptr_t, zv *C.zval, name *C.ch
// create a new task and lock it until the task is done
goArg := goValue(zv)
task := &PendingTask{arg: goArg}
task.done.Lock()
err := task.dispatch(worker)
// dispatch to the queue
select {
case worker.taskChan <- task:
return nil
default:
return phpThreads[threadIndex].pinCString("Task worker queue is full, cannot dispatch task to worker: " + worker.name)
if err != nil {
return phpThreads[threadIndex].pinCString(err.Error())
}
return nil
}

View File

@@ -5,7 +5,6 @@ import (
"log/slog"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
@@ -66,7 +65,6 @@ func TestDispatchToTaskWorkerFromWorker(t *testing.T) {
assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-string.php?count=4", "dispatched 4 tasks")
// wait and shutdown to ensure all logs are flushed
time.Sleep(10 * time.Millisecond)
Shutdown()
// task output appears in logs at info level
@@ -92,7 +90,6 @@ func TestDispatchArrayToTaskWorker(t *testing.T) {
assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-array.php?count=1", "dispatched 1 tasks")
// wait and shutdown to ensure all logs are flushed
time.Sleep(10 * time.Millisecond)
Shutdown()
// task output appears in logs at info level
@@ -107,7 +104,7 @@ func TestDispatchToMultipleWorkers(t *testing.T) {
assert.NoError(t, Init(
WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker2", "./testdata/tasks/task-worker2.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker2", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithNumThreads(4),
WithLogger(logger),
))

View File

@@ -1,6 +1,7 @@
package frankenphp
import (
"errors"
"log/slog"
"testing"
@@ -21,12 +22,26 @@ func testOnDummyPHPThread(t *testing.T, cb func()) {
))
defer Shutdown()
task, err := ExecuteCallbackOnTaskWorker(cb, "tw")
task, err := executeOnPHPThread(cb, "tw")
assert.NoError(t, err)
task.WaitForCompletion()
}
// executeOnPHPThread executes the callback func() directly on a task worker thread
// Currently only used in tests
func executeOnPHPThread(callback func(), taskWorkerName string) (*PendingTask, error) {
tw := getTaskWorkerByName(taskWorkerName)
if tw == nil {
return nil, errors.New("no task worker found with name " + taskWorkerName)
}
pt := &PendingTask{callback: callback}
err := pt.dispatch(tw)
return pt, err
}
func TestGoString(t *testing.T) {
testOnDummyPHPThread(t, func() {
originalString := "Hello, World!"