Allows setting queue len.

This commit is contained in:
Alliballibaba
2025-10-11 22:21:02 +02:00
parent 05bf065a1b
commit 7565628516
6 changed files with 24 additions and 16 deletions

View File

@@ -133,7 +133,7 @@ func (f *FrankenPHPApp) Start() error {
workerOpts := []frankenphp.WorkerOption{
frankenphp.WithWorkerEnv(tw.Env),
frankenphp.WithWorkerWatchMode(tw.Watch),
frankenphp.AsTaskWorker(true),
frankenphp.AsTaskWorker(true, 0), // TODO: maxQueueLen configurable here?
}
opts = append(opts, frankenphp.WithWorkers(tw.Name, repl.ReplaceKnown(tw.FileName, ""), tw.Num, workerOpts...))

View File

@@ -37,6 +37,7 @@ type workerOpt struct {
watch []string
maxConsecutiveFailures int
isTaskWorker bool
maxQueueLen int
}
// WithNumThreads configures the number of PHP threads to start.
@@ -149,9 +150,10 @@ func WithMaxWaitTime(maxWaitTime time.Duration) Option {
}
// EXPERIMENTAL: AsTaskWorker configures the worker as a task worker instead.
func AsTaskWorker(isTaskWorker bool) WorkerOption {
func AsTaskWorker(isTaskWorker bool, maxQueueLen int) WorkerOption {
return func(w *workerOpt) error {
w.isTaskWorker = isTaskWorker
w.maxQueueLen = maxQueueLen
return nil
}
}

View File

@@ -31,7 +31,6 @@ type taskWorkerThread struct {
currentTask *PendingTask
}
const maxQueueLen = 1500 // TODO: make configurable somehow
var taskWorkers []*taskWorker
// EXPERIMENTAL: a task dispatched to a task worker
@@ -86,10 +85,14 @@ func initTaskWorkers(opts []workerOpt) error {
return err
}
if opt.maxQueueLen <= 0 {
opt.maxQueueLen = 10000 // default queue len, TODO: unlimited?
}
tw := &taskWorker{
threads: make([]*phpThread, 0, opt.num),
fileName: fileName,
taskChan: make(chan *PendingTask, maxQueueLen),
taskChan: make(chan *PendingTask, opt.maxQueueLen),
name: opt.name,
num: opt.num,
env: opt.env,
@@ -266,9 +269,9 @@ func go_frankenphp_finish_task(threadIndex C.uintptr_t, zv *C.zval) {
//export go_frankenphp_dispatch_task
func go_frankenphp_dispatch_task(zv *C.zval, name *C.char, nameLen C.size_t) C.bool {
if zv == nil {
logger.Error("no task argument provided")
return C.bool(false)
}
logger.Error("no task argument provided")
return C.bool(false)
}
var worker *taskWorker
if name != nil && nameLen != 0 {
@@ -283,14 +286,17 @@ func go_frankenphp_dispatch_task(zv *C.zval, name *C.char, nameLen C.size_t) C.b
}
// create a new task and lock it until the task is done
task := &PendingTask{arg: goValue(zv)}
goArg := goValue(zv)
task := &PendingTask{arg: goArg}
task.done.Lock()
// dispatch task immediately if a thread available (best performance)
// dispatch to the queue
select {
case worker.taskChan <- task:
return C.bool(true)
default:
logger.Error("task worker queue is full, cannot dispatch task", "name", worker.name, "arg", goArg)
return C.bool(false)
}
go func() {

View File

@@ -30,7 +30,7 @@ func TestDispatchToTaskWorker(t *testing.T) {
"worker",
"./testdata/tasks/task-worker.php",
1,
AsTaskWorker(true),
AsTaskWorker(true, 0),
WithWorkerEnv(PreparedEnv{"CUSTOM_VAR": "custom var"}),
),
WithNumThreads(3),
@@ -57,7 +57,7 @@ func TestDispatchToTaskWorkerFromWorker(t *testing.T) {
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true)),
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker1", "./testdata/tasks/task-dispatcher-string.php", 1),
WithNumThreads(3), // regular thread, task worker thread, dispatcher threads
WithLogger(logger),
@@ -83,7 +83,7 @@ func TestDispatchArrayToTaskWorker(t *testing.T) {
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true)),
WithWorkers("taskworker", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker2", "./testdata/tasks/task-dispatcher-array.php", 1),
WithNumThreads(3), // regular thread, task worker thread, dispatcher thread
WithLogger(logger),
@@ -106,8 +106,8 @@ func TestDispatchToMultipleWorkers(t *testing.T) {
logger := slog.New(handler)
assert.NoError(t, Init(
WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true)),
WithWorkers("worker2", "./testdata/tasks/task-worker2.php", 1, AsTaskWorker(true)),
WithWorkers("worker1", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithWorkers("worker2", "./testdata/tasks/task-worker2.php", 1, AsTaskWorker(true, 0)),
WithNumThreads(4),
WithLogger(logger),
))

View File

@@ -274,7 +274,7 @@ func phpValue(value any) *C.zval {
case float64:
C.__zval_double__(&zval, C.double(v))
case string:
if (v == "") {
if v == "" {
C.__zval_empty_string__(&zval)
break
}

View File

@@ -15,7 +15,7 @@ func testOnDummyPHPThread(t *testing.T, cb func()) {
t.Helper()
logger = slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))
assert.NoError(t, Init(
WithWorkers("tw", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true)),
WithWorkers("tw", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true, 0)),
WithNumThreads(2),
WithLogger(logger),
))