diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index 557e69b4..29398b1a 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -1428,7 +1428,7 @@ func TestServerWithTaskWorker(t *testing.T) { tester := caddytest.NewTester(t) taskWorker1, err := fastabs.FastAbs("../testdata/tasks/task-worker.php") require.NoError(t, err) - taskWorker2, err := fastabs.FastAbs("../testdata/tasks/task-worker2.php") + taskWorker2, err := fastabs.FastAbs("../testdata/tasks/task-worker.php") require.NoError(t, err) tester.InitServer(` { diff --git a/testdata/tasks/task-worker2.php b/testdata/tasks/task-worker2.php deleted file mode 100644 index 389132e0..00000000 --- a/testdata/tasks/task-worker2.php +++ /dev/null @@ -1,9 +0,0 @@ - GO -> C) + // currently only here for tests, TODO: is this useful for extensions? if task.callback != nil { task.callback() go_frankenphp_finish_task(threadIndex, nil) @@ -273,7 +274,7 @@ func go_frankenphp_dispatch_task(threadIndex C.uintptr_t, zv *C.zval, name *C.ch } var worker *taskWorker - if name != nil && nameLen != 0 { + if nameLen != 0 { worker = getTaskWorkerByName(C.GoStringN(name, C.int(nameLen))) } else if len(taskWorkers) != 0 { worker = taskWorkers[0] @@ -286,13 +287,11 @@ func go_frankenphp_dispatch_task(threadIndex C.uintptr_t, zv *C.zval, name *C.ch // create a new task and lock it until the task is done goArg := goValue(zv) task := &PendingTask{arg: goArg} - task.done.Lock() + err := task.dispatch(worker) - // dispatch to the queue - select { - case worker.taskChan <- task: - return nil - default: - return phpThreads[threadIndex].pinCString("Task worker queue is full, cannot dispatch task to worker: " + worker.name) + if err != nil { + return phpThreads[threadIndex].pinCString(err.Error()) } + + return nil } diff --git a/threadtaskworker_test.go b/threadtaskworker_test.go index a3960934..16d424f3 100644 --- a/threadtaskworker_test.go +++ b/threadtaskworker_test.go @@ -5,7 +5,6 @@ import ( "log/slog" "net/http/httptest" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -66,7 +65,6 @@ func TestDispatchToTaskWorkerFromWorker(t *testing.T) { assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-string.php?count=4", "dispatched 4 tasks") // wait and shutdown to ensure all logs are flushed - time.Sleep(10 * time.Millisecond) Shutdown() // task output appears in logs at info level @@ -92,7 +90,6 @@ func TestDispatchArrayToTaskWorker(t *testing.T) { assertGetRequest(t, "http://example.com/testdata/tasks/task-dispatcher-array.php?count=1", "dispatched 1 tasks") // wait and shutdown to ensure all logs are flushed - time.Sleep(10 * time.Millisecond) Shutdown() // task output appears in logs at info level @@ -107,7 +104,7 @@ func TestDispatchToMultipleWorkers(t *testing.T) { assert.NoError(t, Init( WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), - WithWorkers("worker2", "./testdata/tasks/task-worker2.php", 1, AsTaskWorker(true, 0)), + WithWorkers("worker2", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)), WithNumThreads(4), WithLogger(logger), )) diff --git a/types_test.go b/types_test.go index bc9fdd98..9972c7f9 100644 --- a/types_test.go +++ b/types_test.go @@ -1,6 +1,7 @@ package frankenphp import ( + "errors" "log/slog" "testing" @@ -21,12 +22,26 @@ func testOnDummyPHPThread(t *testing.T, cb func()) { )) defer Shutdown() - task, err := ExecuteCallbackOnTaskWorker(cb, "tw") + task, err := executeOnPHPThread(cb, "tw") assert.NoError(t, err) task.WaitForCompletion() } +// executeOnPHPThread executes the callback func() directly on a task worker thread +// Currently only used in tests +func executeOnPHPThread(callback func(), taskWorkerName string) (*PendingTask, error) { + tw := getTaskWorkerByName(taskWorkerName) + if tw == nil { + return nil, errors.New("no task worker found with name " + taskWorkerName) + } + + pt := &PendingTask{callback: callback} + err := pt.dispatch(tw) + + return pt, err +} + func TestGoString(t *testing.T) { testOnDummyPHPThread(t, func() { originalString := "Hello, World!"