suggestion: external worker api (#1928)

* Cleaner request apis.
This commit is contained in:
Alexander Stecher
2025-10-28 20:37:20 +01:00
committed by Kévin Dunglas
parent 9b8d215727
commit 1270784cd3
7 changed files with 222 additions and 207 deletions

View File

@@ -52,7 +52,8 @@ var (
ErrScriptExecution = errors.New("error during PHP script execution")
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
isRunning bool
isRunning bool
onServerShutdown []func()
loggerMu sync.RWMutex
logger *slog.Logger
@@ -216,7 +217,7 @@ func Init(options ...Option) error {
// add registered external workers
for _, ew := range extensionWorkers {
options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.MinThreads(), WithWorkerEnv(ew.Env())))
options = append(options, WithWorkers(ew.name, ew.fileName, ew.num, ew.options...))
}
opt := &opt{}
@@ -291,6 +292,17 @@ func Init(options ...Option) error {
logger.LogAttrs(ctx, slog.LevelInfo, "embedded PHP app 📦", slog.String("path", EmbeddedAppPath))
}
// register the startup/shutdown hooks (mainly useful for extensions)
onServerShutdown = nil
for _, w := range opt.workers {
if w.onServerStartup != nil {
w.onServerStartup()
}
if w.onServerShutdown != nil {
onServerShutdown = append(onServerShutdown, w.onServerShutdown)
}
}
return nil
}
@@ -300,6 +312,11 @@ func Shutdown() {
return
}
// call the shutdown hooks (mainly useful for extensions)
for _, fn := range onServerShutdown {
fn()
}
drainWatcher()
drainAutoScaling()
drainPHPThreads()

View File

@@ -35,6 +35,10 @@ type workerOpt struct {
env PreparedEnv
watch []string
maxConsecutiveFailures int
onThreadReady func(int)
onThreadShutdown func(int)
onServerStartup func()
onServerShutdown func()
}
// WithNumThreads configures the number of PHP threads to start.
@@ -116,6 +120,40 @@ func WithWorkerMaxFailures(maxFailures int) WorkerOption {
}
}
func WithWorkerOnReady(f func(int)) WorkerOption {
return func(w *workerOpt) error {
w.onThreadReady = f
return nil
}
}
func WithWorkerOnShutdown(f func(int)) WorkerOption {
return func(w *workerOpt) error {
w.onThreadShutdown = f
return nil
}
}
// WithWorkerOnServerStartup adds a function to be called right after server startup. Useful for extensions.
func WithWorkerOnServerStartup(f func()) WorkerOption {
return func(w *workerOpt) error {
w.onServerStartup = f
return nil
}
}
// WithWorkerOnServerShutdown adds a function to be called right before server shutdown. Useful for extensions.
func WithWorkerOnServerShutdown(f func()) WorkerOption {
return func(w *workerOpt) error {
w.onServerShutdown = f
return nil
}
}
// WithLogger configures the global logger to use.
func WithLogger(l *slog.Logger) Option {
return func(o *opt) error {

8
testdata/message-worker.php vendored Normal file
View File

@@ -0,0 +1,8 @@
<?php
while (frankenphp_handle_request(function ($message) {
echo $message;
return "received message: $message";
})) {
// continue handling requests
}

View File

@@ -21,13 +21,10 @@ type workerThread struct {
dummyContext *frankenPHPContext
workerContext *frankenPHPContext
backoff *exponentialBackoff
externalWorker Worker
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
}
func convertToWorkerThread(thread *phpThread, worker *worker) {
externalWorker := extensionWorkers[worker.name]
thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
@@ -37,7 +34,6 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: worker.maxConsecutiveFailures,
},
externalWorker: externalWorker,
})
worker.attachThread(thread)
}
@@ -46,27 +42,27 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
func (handler *workerThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
if handler.externalWorker != nil {
handler.externalWorker.OnServerShutdown(handler.thread.threadIndex)
if handler.worker.onThreadShutdown != nil {
handler.worker.onThreadShutdown(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateRestarting:
if handler.externalWorker != nil {
handler.externalWorker.OnShutdown(handler.thread.threadIndex)
if handler.worker.onThreadShutdown != nil {
handler.worker.onThreadShutdown(handler.thread.threadIndex)
}
handler.state.set(stateYielding)
handler.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateReady, stateTransitionComplete:
if handler.externalWorker != nil {
handler.externalWorker.OnReady(handler.thread.threadIndex)
if handler.worker.onThreadReady != nil {
handler.worker.onThreadReady(handler.thread.threadIndex)
}
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
if handler.externalWorker != nil {
handler.externalWorker.OnServerShutdown(handler.thread.threadIndex)
if handler.worker.onThreadShutdown != nil {
handler.worker.onThreadShutdown(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
// signal to stop

View File

@@ -23,6 +23,8 @@ type worker struct {
threadMutex sync.RWMutex
allowPathMatching bool
maxConsecutiveFailures int
onThreadReady func(int)
onThreadShutdown func(int)
}
var (
@@ -51,12 +53,6 @@ func initWorkers(opt []workerOpt) error {
convertToWorkerThread(thread, w)
go func() {
thread.state.waitFor(stateReady)
// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
go startWorker(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()
}
@@ -131,6 +127,8 @@ func newWorker(o workerOpt) (*worker, error) {
threads: make([]*phpThread, 0, o.num),
allowPathMatching: allowPathMatching,
maxConsecutiveFailures: o.maxConsecutiveFailures,
onThreadReady: o.onThreadReady,
onThreadShutdown: o.onThreadShutdown,
}
return w, nil

View File

@@ -1,11 +1,8 @@
package frankenphp
import (
"context"
"log/slog"
"errors"
"net/http"
"sync"
"sync/atomic"
)
// EXPERIMENTAL: Worker allows you to register a worker where, instead of calling FrankenPHP handlers on
@@ -17,164 +14,97 @@ import (
// After the execution of frankenphp_handle_request(), the return value WorkerRequest.AfterFunc will be called,
// with the optional return value of the callback passed as parameter.
//
// A worker script with the provided Name and FileName will be registered, along with the provided
// configuration. You can also provide any environment variables that you want through Env.
//
// Name() and FileName() are only called once at startup, so register them in an init() function.
// A worker script with the provided name, fileName and thread count will be registered, along with additional
// configuration through WorkerOptions.
//
// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Extension workers receive the lowest priority when determining thread allocations. If MinThreads cannot be
// allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type Worker interface {
// Name returns the worker name
Name() string
// FileName returns the PHP script filename
FileName() string
// Env returns the environment variables available in the worker script.
Env() PreparedEnv
// MinThreads returns the minimum number of threads to reserve from the FrankenPHP thread pool.
// This number must be positive.
MinThreads() int
// OnReady is called when the worker is assigned to a thread and receives an opaque thread ID as parameter.
// This is a time for setting up any per-thread resources.
OnReady(threadId int)
// OnShutdown is called when the worker is shutting down and receives an opaque thread ID as parameter.
// This is a time for cleaning up any per-thread resources.
OnShutdown(threadId int)
// OnServerShutdown is called when FrankenPHP is shutting down.
OnServerShutdown(threadId int)
// GetRequest is called once at least one thread is ready.
// The returned request will be passed to the worker script.
GetRequest() *WorkerRequest
// SendRequest sends a request to the worker script. The callback function of frankenphp_handle_request() will be called.
SendRequest(r *WorkerRequest)
}
// EXPERIMENTAL: WorkerRequest represents a request to pass to a worker script.
type WorkerRequest struct {
// Request is an optional HTTP request for your worker script to handle
Request *http.Request
// Response is an optional response writer that provides the output of the provided request, it must not be nil to access the request body
Response http.ResponseWriter
// CallbackParameters is an optional field that will be converted in PHP types or left as-is if it's an unsafe.Pointer and passed as parameter to the PHP callback
CallbackParameters any
// AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter
AfterFunc func(callbackReturn any)
type Worker struct {
name string
fileName string
num int
options []WorkerOption
}
var extensionWorkers = make(map[string]Worker)
var extensionWorkersMutex sync.Mutex
// EXPERIMENTAL: RegisterWorker registers a custom worker script.
func RegisterWorker(worker Worker) {
extensionWorkersMutex.Lock()
defer extensionWorkersMutex.Unlock()
// EXPERIMENTAL: RegisterWorker registers an external worker.
// external workers are booted together with regular workers on server startup.
func RegisterWorker(worker Worker) error {
if _, exists := extensionWorkers[worker.name]; exists {
return errors.New("worker with this name is already registered: " + worker.name)
}
extensionWorkers[worker.Name()] = worker
extensionWorkers[worker.name] = worker
return nil
}
// startWorker creates a pipe from a worker to the main worker.
func startWorker(w *worker, extensionWorker Worker, thread *phpThread) {
for {
rq := extensionWorker.GetRequest()
// EXPERIMENTAL: SendRequest sends an HTTP request to the worker and writes the response to the provided ResponseWriter.
func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error {
worker := getWorkerByName(w.name)
var fc *frankenPHPContext
if rq.Request == nil {
fc = newFrankenPHPContext()
fc.logger = logger
} else {
fr, err := NewRequestWithContext(rq.Request, WithOriginalRequest(rq.Request))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
continue
}
if worker == nil {
return errors.New("worker not found: " + w.name)
}
var ok bool
if fc, ok = fromContext(fr.Context()); !ok {
continue
}
}
fr, err := NewRequestWithContext(
r,
WithOriginalRequest(r),
WithWorkerName(w.name),
)
fc.worker = w
if err != nil {
return err
}
fc.responseWriter = rq.Response
fc.handlerParameters = rq.CallbackParameters
err = ServeHTTP(rw, fr)
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
if err != nil {
return err
}
w.requestChan <- fc
if rq.AfterFunc != nil {
go func() {
<-fc.done
return nil
}
if rq.AfterFunc != nil {
rq.AfterFunc(fc.handlerReturn)
}
}()
}
func (w Worker) NumThreads() int {
worker := getWorkerByName(w.name)
if worker == nil {
return 0
}
return worker.countThreads()
}
// EXPERIMENTAL: SendMessage sends a message to the worker and waits for a response.
func (w Worker) SendMessage(message any, rw http.ResponseWriter) (any, error) {
internalWorker := getWorkerByName(w.name)
if internalWorker == nil {
return nil, errors.New("worker not found: " + w.name)
}
fc := newFrankenPHPContext()
fc.logger = logger
fc.worker = internalWorker
fc.responseWriter = rw
fc.handlerParameters = message
internalWorker.handleRequest(fc)
return fc.handlerReturn, nil
}
// EXPERIMENTAL: NewWorker registers an external worker with the given options
func NewWorker(name string, fileName string, num int, options ...WorkerOption) Worker {
return Worker{
name: name,
fileName: fileName,
num: num,
options: options,
}
}
// EXPERIMENTAL: NewWorker creates a Worker instance to embed in a custom struct implementing the Worker interface.
// The returned instance may be sufficient on its own for simple use cases.
func NewWorker(name, fileName string, minThreads int, env PreparedEnv) Worker {
return &defaultWorker{
name: name,
fileName: fileName,
env: env,
minThreads: minThreads,
requestChan: make(chan *WorkerRequest),
activatedCount: atomic.Int32{},
drainCount: atomic.Int32{},
}
}
type defaultWorker struct {
name string
fileName string
env PreparedEnv
minThreads int
requestChan chan *WorkerRequest
activatedCount atomic.Int32
drainCount atomic.Int32
}
func (w *defaultWorker) Name() string {
return w.name
}
func (w *defaultWorker) FileName() string {
return w.fileName
}
func (w *defaultWorker) Env() PreparedEnv {
return w.env
}
func (w *defaultWorker) MinThreads() int {
return w.minThreads
}
func (w *defaultWorker) OnReady(_ int) {
w.activatedCount.Add(1)
}
func (w *defaultWorker) OnShutdown(_ int) {
w.drainCount.Add(1)
}
func (w *defaultWorker) OnServerShutdown(_ int) {
w.drainCount.Add(-1)
w.activatedCount.Add(-1)
}
func (w *defaultWorker) GetRequest() *WorkerRequest {
return <-w.requestChan
}
func (w *defaultWorker) SendRequest(r *WorkerRequest) {
w.requestChan <- r
}

View File

@@ -4,73 +4,101 @@ import (
"io"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// mockWorker implements the Worker interface
type mockWorker struct {
Worker
}
func (*mockWorker) OnShutdown(threadId int) {
//TODO implement me
panic("implement me")
}
func TestWorkerExtension(t *testing.T) {
// Create a mock worker extension
mockExt := &mockWorker{
Worker: NewWorker("mockWorker", "testdata/worker.php", 1, nil),
}
readyWorkers := 0
shutdownWorkers := 0
serverStarts := 0
serverShutDowns := 0
// Register the mock extension
RegisterWorker(mockExt)
externalWorker := NewWorker(
"externalWorker",
"testdata/worker.php",
1,
WithWorkerOnReady(func(id int) {
readyWorkers++
}),
WithWorkerOnShutdown(func(id int) {
serverShutDowns++
}),
WithWorkerOnServerStartup(func() {
serverStarts++
}),
WithWorkerOnServerShutdown(func() {
shutdownWorkers++
}),
)
// Clean up external workers after test to avoid interfering with other tests
assert.NoError(t, RegisterWorker(externalWorker))
require.NoError(t, Init())
defer func() {
delete(extensionWorkers, mockExt.Name())
// Clean up external workers after test to avoid interfering with other tests
delete(extensionWorkers, externalWorker.name)
Shutdown()
assert.Equal(t, 1, shutdownWorkers, "Worker shutdown hook should have been called")
assert.Equal(t, 1, serverShutDowns, "Server shutdown hook should have been called")
}()
// Initialize FrankenPHP with a worker that has a different name than our extension
err := Init()
require.NoError(t, err)
defer Shutdown()
// Wait a bit for the worker to be ready
time.Sleep(100 * time.Millisecond)
// Verify that the extension's thread was activated
assert.GreaterOrEqual(t, int(mockExt.Worker.(*defaultWorker).activatedCount.Load()), 1, "Thread should have been activated")
assert.Equal(t, readyWorkers, 1, "Worker thread should have called onReady()")
assert.Equal(t, serverStarts, 1, "Server start hook should have been called")
assert.Equal(t, externalWorker.NumThreads(), 1, "NumThreads() should report 1 thread")
// Create a test request
req := httptest.NewRequest("GET", "https://example.com/test/?foo=bar", nil)
req.Header.Set("X-Test-Header", "test-value")
w := httptest.NewRecorder()
// Create a channel to signal when the request is done
done := make(chan struct{})
// Inject the request into the worker through the extension
mockExt.SendRequest(&WorkerRequest{
Request: req,
Response: w,
AfterFunc: func(callbackReturn any) {
close(done)
},
})
err := externalWorker.SendRequest(w, req)
assert.NoError(t, err, "Sending request should not produce an error")
// Wait for the request to be fully processed
<-done
// Check the response - now safe from race conditions
resp := w.Result()
body, _ := io.ReadAll(resp.Body)
// The worker.php script should output information about the request
// We're just checking that we got a response, not the specific content
assert.NotEmpty(t, body, "Response body should not be empty")
assert.Contains(t, string(body), "Requests handled: 0", "Response body should contain request information")
}
func TestWorkerExtensionSendMessage(t *testing.T) {
externalWorker := NewWorker("externalWorker", "testdata/message-worker.php", 1)
assert.NoError(t, RegisterWorker(externalWorker))
// Clean up external workers after test to avoid interfering with other tests
defer func() {
delete(extensionWorkers, externalWorker.name)
}()
err := Init()
require.NoError(t, err)
defer Shutdown()
result, err := externalWorker.SendMessage("Hello Worker", nil)
assert.NoError(t, err, "Sending message should not produce an error")
switch v := result.(type) {
case string:
assert.Equal(t, "received message: Hello Worker", v)
default:
t.Fatalf("Expected result to be string, got %T", v)
}
}
func TestErrorIf2WorkersHaveSameName(t *testing.T) {
w := NewWorker("duplicateWorker", "testdata/worker.php", 1)
w2 := NewWorker("duplicateWorker", "testdata/worker2.php", 1)
err := RegisterWorker(w)
require.NoError(t, err, "First registration should succeed")
err = RegisterWorker(w2)
require.Error(t, err, "Second registration with duplicate name should fail")
// Clean up external workers after test to avoid interfering with other tests
extensionWorkers = make(map[string]Worker)
}