mirror of
https://github.com/dunglas/frankenphp.git
synced 2025-09-26 19:41:13 +08:00
Allows direct execution on tasks and correctly frees in types_test.
This commit is contained in:
@@ -1,90 +0,0 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// representation of a thread that handles tasks directly assigned by go
|
||||
// implements the threadHandler interface
|
||||
type taskThread struct {
|
||||
thread *phpThread
|
||||
execChan chan *task
|
||||
}
|
||||
|
||||
// task callbacks will be executed directly on the PHP thread
|
||||
// therefore having full access to the PHP runtime
|
||||
type task struct {
|
||||
callback func()
|
||||
done sync.Mutex
|
||||
}
|
||||
|
||||
func newTask(cb func()) *task {
|
||||
t := &task{callback: cb}
|
||||
t.done.Lock()
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *task) waitForCompletion() {
|
||||
t.done.Lock()
|
||||
}
|
||||
|
||||
func convertToTaskThread(thread *phpThread) *taskThread {
|
||||
handler := &taskThread{
|
||||
thread: thread,
|
||||
execChan: make(chan *task),
|
||||
}
|
||||
thread.setHandler(handler)
|
||||
return handler
|
||||
}
|
||||
|
||||
func (handler *taskThread) beforeScriptExecution() string {
|
||||
thread := handler.thread
|
||||
|
||||
switch thread.state.get() {
|
||||
case stateTransitionRequested:
|
||||
return thread.transitionToNewHandler()
|
||||
case stateBooting, stateTransitionComplete:
|
||||
thread.state.set(stateReady)
|
||||
handler.waitForTasks()
|
||||
|
||||
return handler.beforeScriptExecution()
|
||||
case stateReady:
|
||||
handler.waitForTasks()
|
||||
|
||||
return handler.beforeScriptExecution()
|
||||
case stateShuttingDown:
|
||||
// signal to stop
|
||||
return ""
|
||||
}
|
||||
panic("unexpected state: " + thread.state.name())
|
||||
}
|
||||
|
||||
func (handler *taskThread) afterScriptExecution(int) {
|
||||
panic("task threads should not execute scripts")
|
||||
}
|
||||
|
||||
func (handler *taskThread) getRequestContext() *frankenPHPContext {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (handler *taskThread) name() string {
|
||||
return "Task PHP Thread"
|
||||
}
|
||||
|
||||
func (handler *taskThread) waitForTasks() {
|
||||
for {
|
||||
select {
|
||||
case task := <-handler.execChan:
|
||||
task.callback()
|
||||
task.done.Unlock() // unlock the task to signal completion
|
||||
case <-handler.thread.drainChan:
|
||||
// thread is shutting down, do not execute the function
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *taskThread) execute(t *task) {
|
||||
handler.execChan <- t
|
||||
}
|
@@ -35,9 +35,10 @@ var taskWorkers []*taskWorker
|
||||
|
||||
// EXPERIMENTAL: a task dispatched to a task worker
|
||||
type PendingTask struct {
|
||||
str *C.char
|
||||
len C.size_t
|
||||
done sync.RWMutex
|
||||
str *C.char
|
||||
len C.size_t
|
||||
done sync.RWMutex
|
||||
callback func()
|
||||
}
|
||||
|
||||
func (t *PendingTask) WaitForCompletion() {
|
||||
@@ -59,6 +60,21 @@ func DispatchTask(task string, workerName string) (*PendingTask, error) {
|
||||
return pt, nil
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: ExecuteTask executes the callback func() directly on a task worker thread
|
||||
func ExecuteTask(callback func(), workerName string) (*PendingTask, error) {
|
||||
tw := getTaskWorkerByName(workerName)
|
||||
if tw == nil {
|
||||
return nil, errors.New("no task worker found with name " + workerName)
|
||||
}
|
||||
|
||||
pt := &PendingTask{callback: callback}
|
||||
pt.done.Lock()
|
||||
|
||||
tw.taskChan <- pt
|
||||
|
||||
return pt, nil
|
||||
}
|
||||
|
||||
func initTaskWorkers(opts []workerOpt) error {
|
||||
taskWorkers = make([]*taskWorker, 0, len(opts))
|
||||
for _, opt := range opts {
|
||||
@@ -167,7 +183,7 @@ func (handler *taskWorkerThread) setupWorkerScript() string {
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) afterScriptExecution(int) {
|
||||
// potential place for cleanup after task execution
|
||||
// restart the script
|
||||
}
|
||||
|
||||
func (handler *taskWorkerThread) getRequestContext() *frankenPHPContext {
|
||||
@@ -210,6 +226,17 @@ func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) C.go_string {
|
||||
case task := <-handler.taskWorker.taskChan:
|
||||
handler.currentTask = task
|
||||
thread.state.markAsWaiting(false)
|
||||
|
||||
// if the task has a callback, handle it directly
|
||||
// callbacks may call into C (C -> GO -> C)
|
||||
if task.callback != nil {
|
||||
task.callback()
|
||||
go_frankenphp_finish_task(threadIndex)
|
||||
|
||||
return go_frankenphp_worker_handle_task(threadIndex)
|
||||
}
|
||||
|
||||
// if the task has no callback, forward it to PHP
|
||||
return C.go_string{len: task.len, data: task.str}
|
||||
case <-handler.thread.drainChan:
|
||||
thread.state.markAsWaiting(false)
|
||||
|
10
types.go
10
types.go
@@ -313,3 +313,13 @@ func castZval(zval *C.zval, expectedType C.uint8_t) unsafe.Pointer {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func zvalPtrDtor(p unsafe.Pointer) {
|
||||
zv := (*C.zval)(p)
|
||||
C.zval_ptr_dtor(zv)
|
||||
}
|
||||
|
||||
func zendStringRelease(p unsafe.Pointer) {
|
||||
zs := (*C.zend_string)(p)
|
||||
C.zend_string_release(zs)
|
||||
}
|
||||
|
@@ -1,36 +1,40 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap/exp/zapslog"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
// execute the function on a PHP thread directly
|
||||
// this is necessary if tests make use of PHP's internal allocation
|
||||
func testOnDummyPHPThread(t *testing.T, test func()) {
|
||||
func testOnDummyPHPThread(t *testing.T, cb func()) {
|
||||
t.Helper()
|
||||
logger = slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
_, err := initPHPThreads(1, 1, nil) // boot 1 thread
|
||||
logger = slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))
|
||||
assert.NoError(t, Init(
|
||||
WithWorkers("tw", "./testdata/tasks/task-worker.php", 1, AsTaskWorker(true)),
|
||||
WithNumThreads(2),
|
||||
WithLogger(logger),
|
||||
))
|
||||
defer Shutdown()
|
||||
|
||||
task, err := ExecuteTask(cb, "tw")
|
||||
assert.NoError(t, err)
|
||||
handler := convertToTaskThread(phpThreads[0])
|
||||
|
||||
task := newTask(test)
|
||||
handler.execute(task)
|
||||
task.waitForCompletion()
|
||||
|
||||
drainPHPThreads()
|
||||
task.WaitForCompletion()
|
||||
}
|
||||
|
||||
func TestGoString(t *testing.T) {
|
||||
testOnDummyPHPThread(t, func() {
|
||||
originalString := "Hello, World!"
|
||||
|
||||
convertedString := GoString(PHPString(originalString, false))
|
||||
phpString := PHPString(originalString, false)
|
||||
defer zendStringRelease(phpString)
|
||||
|
||||
assert.Equal(t, originalString, convertedString, "string -> zend_string -> string should yield an equal string")
|
||||
assert.Equal(t, originalString, GoString(phpString), "string -> zend_string -> string should yield an equal string")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -41,9 +45,10 @@ func TestPHPMap(t *testing.T) {
|
||||
"foo2": "bar2",
|
||||
}
|
||||
|
||||
convertedMap := GoMap(PHPMap(originalMap))
|
||||
phpArray := PHPMap(originalMap)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
|
||||
assert.Equal(t, originalMap, convertedMap, "associative array should be equal after conversion")
|
||||
assert.Equal(t, originalMap, GoMap(phpArray), "associative array should be equal after conversion")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -57,9 +62,10 @@ func TestOrderedPHPAssociativeArray(t *testing.T) {
|
||||
Order: []string{"foo2", "foo1"},
|
||||
}
|
||||
|
||||
convertedArray := GoAssociativeArray(PHPAssociativeArray(originalArray))
|
||||
phpArray := PHPAssociativeArray(originalArray)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
|
||||
assert.Equal(t, originalArray, convertedArray, "associative array should be equal after conversion")
|
||||
assert.Equal(t, originalArray, GoAssociativeArray(phpArray), "associative array should be equal after conversion")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -67,9 +73,10 @@ func TestPHPPackedArray(t *testing.T) {
|
||||
testOnDummyPHPThread(t, func() {
|
||||
originalSlice := []any{"bar1", "bar2"}
|
||||
|
||||
convertedSlice := GoPackedArray(PHPPackedArray(originalSlice))
|
||||
phpArray := PHPPackedArray(originalSlice)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
|
||||
assert.Equal(t, originalSlice, convertedSlice, "slice should be equal after conversion")
|
||||
assert.Equal(t, originalSlice, GoPackedArray(phpArray), "slice should be equal after conversion")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -81,9 +88,10 @@ func TestPHPPackedArrayToGoMap(t *testing.T) {
|
||||
"1": "bar2",
|
||||
}
|
||||
|
||||
convertedMap := GoMap(PHPPackedArray(originalSlice))
|
||||
phpArray := PHPPackedArray(originalSlice)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
|
||||
assert.Equal(t, expectedMap, convertedMap, "convert a packed to an associative array")
|
||||
assert.Equal(t, expectedMap, GoMap(phpArray), "convert a packed to an associative array")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -98,9 +106,10 @@ func TestPHPAssociativeArrayToPacked(t *testing.T) {
|
||||
}
|
||||
expectedSlice := []any{"bar1", "bar2"}
|
||||
|
||||
convertedSlice := GoPackedArray(PHPAssociativeArray(originalArray))
|
||||
phpArray := PHPAssociativeArray(originalArray)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
|
||||
assert.Equal(t, expectedSlice, convertedSlice, "convert an associative array to a slice")
|
||||
assert.Equal(t, expectedSlice, GoPackedArray(phpArray), "convert an associative array to a slice")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -120,8 +129,9 @@ func TestNestedMixedArray(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
convertedArray := GoMap(PHPMap(originalArray))
|
||||
phpArray := PHPMap(originalArray)
|
||||
defer zvalPtrDtor(phpArray)
|
||||
|
||||
assert.Equal(t, originalArray, convertedArray, "nested mixed array should be equal after conversion")
|
||||
assert.Equal(t, originalArray, GoMap(phpArray), "nested mixed array should be equal after conversion")
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user