Compare commits

...

14 Commits

Author SHA1 Message Date
Kévin Dunglas
26a84c5700 Simplify 2025-09-13 13:57:37 +02:00
Robert Landers
0d1daf6685 add more comments
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
9cfcf4c0df we never shutdown workers or remove them, so we do not need this
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
6c9cfd608f add a way to detect when a request is completed
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
d2432c6ab3 fix test
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
0ec99d53de pipes are tied to workers, not threads
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
8318763341 add tests
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
b23db79d2d add error handling and handle shutdowns
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
ceec4a0d3d satisfy linter
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
3cb1e273d9 simplify providing a request
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
2e7e969b04 remove mention of an old function that no longer exists
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
b7ff2b42ad fix comment
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:29 +02:00
Robert Landers
cd7c51d69b add tests
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:28 +02:00
Robert Landers
d0d6747742 create a simple thread framework
Signed-off-by: Robert Landers <landers.robert@gmail.com>
2025-09-12 12:01:28 +02:00
5 changed files with 258 additions and 4 deletions

View File

@@ -214,6 +214,11 @@ func Init(options ...Option) error {
registerExtensions()
// add registered external workers
for _, ew := range externalWorkers {
options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.GetMinThreads(), WithWorkerEnv(ew.Env())))
}
opt := &opt{}
for _, o := range options {
if err := o(opt); err != nil {

93
threadFramework.go Normal file
View File

@@ -0,0 +1,93 @@
package frankenphp
import (
"context"
"log/slog"
"net/http"
"sync"
)
// WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
// frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard
// http.Request that will be conferred to the underlying worker script.
//
// A worker script with the provided Name and FileName will be registered, along with the provided
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
// These methods are only called once at startup, so register them in an init() function.
//
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
//
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type WorkerExtension interface {
Name() string
FileName() string
Env() PreparedEnv
GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
ProvideRequest() *WorkerRequest
}
type WorkerRequest struct {
// The request for your worker script to handle
Request *http.Request
// Response is a response writer that provides the output of the provided request
Response http.ResponseWriter
// Done is an optional channel that will be closed when the request processing is complete
Done chan struct{}
}
var externalWorkers = make(map[string]WorkerExtension)
var externalWorkerMutex sync.Mutex
func RegisterExternalWorker(worker WorkerExtension) {
externalWorkerMutex.Lock()
defer externalWorkerMutex.Unlock()
externalWorkers[worker.Name()] = worker
}
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
for {
rq := externalWorker.ProvideRequest()
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name))
w.requestChan <- fc
if rq.Done != nil {
go func() {
<-fc.done
close(rq.Done)
}()
}
}
}
}

134
threadFramework_test.go Normal file
View File

@@ -0,0 +1,134 @@
package frankenphp
import (
"io"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// mockWorkerExtension implements the WorkerExtension interface
type mockWorkerExtension struct {
name string
fileName string
env PreparedEnv
minThreads int
requestChan chan *WorkerRequest
activatedCount int
drainCount int
deactivatedCount int
mu sync.Mutex
}
func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension {
return &mockWorkerExtension{
name: name,
fileName: fileName,
env: make(PreparedEnv),
minThreads: minThreads,
requestChan: make(chan *WorkerRequest, 10), // Buffer to avoid blocking
}
}
func (m *mockWorkerExtension) Name() string {
return m.name
}
func (m *mockWorkerExtension) FileName() string {
return m.fileName
}
func (m *mockWorkerExtension) Env() PreparedEnv {
return m.env
}
func (m *mockWorkerExtension) GetMinThreads() int {
return m.minThreads
}
func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) {
m.mu.Lock()
defer m.mu.Unlock()
m.activatedCount++
}
func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) {
m.mu.Lock()
defer m.mu.Unlock()
m.drainCount++
}
func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) {
m.mu.Lock()
defer m.mu.Unlock()
m.deactivatedCount++
}
func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest {
return <-m.requestChan
}
func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest) {
m.requestChan <- r
}
func (m *mockWorkerExtension) GetActivatedCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.activatedCount
}
func TestWorkerExtension(t *testing.T) {
// Create a mock extension
mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1)
// Register the mock extension
RegisterExternalWorker(mockExt)
// Clean up external workers after test to avoid interfering with other tests
defer func() {
delete(externalWorkers, mockExt.Name())
}()
// Initialize FrankenPHP with a worker that has a different name than our extension
err := Init()
require.NoError(t, err)
defer Shutdown()
// Wait a bit for the worker to be ready
time.Sleep(100 * time.Millisecond)
// Verify that the extension's thread was activated
assert.GreaterOrEqual(t, mockExt.GetActivatedCount(), 1, "Thread should have been activated")
// Create a test request
req := httptest.NewRequest("GET", "http://example.com/test/?foo=bar", nil)
req.Header.Set("X-Test-Header", "test-value")
w := httptest.NewRecorder()
// Create a channel to signal when the request is done
done := make(chan struct{})
// Inject the request into the worker through the extension
mockExt.InjectRequest(&WorkerRequest{
Request: req,
Response: w,
Done: done,
})
// Wait for the request to be fully processed
<-done
// Check the response - now safe from race conditions
resp := w.Result()
body, _ := io.ReadAll(resp.Body)
// The worker.php script should output information about the request
// We're just checking that we got a response, not the specific content
assert.NotEmpty(t, body, "Response body should not be empty")
}

View File

@@ -19,10 +19,13 @@ type workerThread struct {
dummyContext *frankenPHPContext
workerContext *frankenPHPContext
backoff *exponentialBackoff
externalWorker WorkerExtension
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
}
func convertToWorkerThread(thread *phpThread, worker *worker) {
externalWorker := externalWorkers[worker.name]
thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
@@ -32,6 +35,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: worker.maxConsecutiveFailures,
},
externalWorker: externalWorker,
})
worker.attachThread(thread)
}
@@ -40,16 +44,28 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
func (handler *workerThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
if handler.externalWorker != nil {
handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateRestarting:
if handler.externalWorker != nil {
handler.externalWorker.ThreadDrainNotification(handler.thread.threadIndex)
}
handler.state.set(stateYielding)
handler.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateReady, stateTransitionComplete:
if handler.externalWorker != nil {
handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex)
}
setupWorkerScript(handler, handler.worker)
return handler.worker.fileName
case stateShuttingDown:
if handler.externalWorker != nil {
handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex)
}
handler.worker.detachThread(handler.thread)
// signal to stop
return ""

View File

@@ -44,13 +44,19 @@ func initWorkers(opt []workerOpt) error {
workers = append(workers, w)
}
for _, worker := range workers {
workersReady.Add(worker.num)
for i := 0; i < worker.num; i++ {
for _, w := range workers {
workersReady.Add(w.num)
for i := 0; i < w.num; i++ {
thread := getInactivePHPThread()
convertToWorkerThread(thread, worker)
convertToWorkerThread(thread, w)
go func() {
thread.state.waitFor(stateReady)
// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()
}