mirror of
https://github.com/dunglas/frankenphp.git
synced 2025-12-24 13:38:11 +08:00
Compare commits
14 Commits
detached
...
add/modula
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26a84c5700 | ||
|
|
0d1daf6685 | ||
|
|
9cfcf4c0df | ||
|
|
6c9cfd608f | ||
|
|
d2432c6ab3 | ||
|
|
0ec99d53de | ||
|
|
8318763341 | ||
|
|
b23db79d2d | ||
|
|
ceec4a0d3d | ||
|
|
3cb1e273d9 | ||
|
|
2e7e969b04 | ||
|
|
b7ff2b42ad | ||
|
|
cd7c51d69b | ||
|
|
d0d6747742 |
@@ -214,6 +214,11 @@ func Init(options ...Option) error {
|
||||
|
||||
registerExtensions()
|
||||
|
||||
// add registered external workers
|
||||
for _, ew := range externalWorkers {
|
||||
options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.GetMinThreads(), WithWorkerEnv(ew.Env())))
|
||||
}
|
||||
|
||||
opt := &opt{}
|
||||
for _, o := range options {
|
||||
if err := o(opt); err != nil {
|
||||
|
||||
93
threadFramework.go
Normal file
93
threadFramework.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
|
||||
// frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard
|
||||
// http.Request that will be conferred to the underlying worker script.
|
||||
//
|
||||
// A worker script with the provided Name and FileName will be registered, along with the provided
|
||||
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
|
||||
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
|
||||
// These methods are only called once at startup, so register them in an init() function.
|
||||
//
|
||||
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
|
||||
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
|
||||
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
|
||||
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
|
||||
//
|
||||
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
|
||||
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
|
||||
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
|
||||
//
|
||||
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
|
||||
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
|
||||
// total threads). Don't be greedy.
|
||||
type WorkerExtension interface {
|
||||
Name() string
|
||||
FileName() string
|
||||
Env() PreparedEnv
|
||||
GetMinThreads() int
|
||||
ThreadActivatedNotification(threadId int)
|
||||
ThreadDrainNotification(threadId int)
|
||||
ThreadDeactivatedNotification(threadId int)
|
||||
ProvideRequest() *WorkerRequest
|
||||
}
|
||||
|
||||
type WorkerRequest struct {
|
||||
// The request for your worker script to handle
|
||||
Request *http.Request
|
||||
// Response is a response writer that provides the output of the provided request
|
||||
Response http.ResponseWriter
|
||||
// Done is an optional channel that will be closed when the request processing is complete
|
||||
Done chan struct{}
|
||||
}
|
||||
|
||||
var externalWorkers = make(map[string]WorkerExtension)
|
||||
var externalWorkerMutex sync.Mutex
|
||||
|
||||
func RegisterExternalWorker(worker WorkerExtension) {
|
||||
externalWorkerMutex.Lock()
|
||||
defer externalWorkerMutex.Unlock()
|
||||
|
||||
externalWorkers[worker.Name()] = worker
|
||||
}
|
||||
|
||||
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
|
||||
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
|
||||
for {
|
||||
rq := externalWorker.ProvideRequest()
|
||||
|
||||
if rq == nil || rq.Request == nil {
|
||||
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
|
||||
continue
|
||||
}
|
||||
|
||||
r := rq.Request
|
||||
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
|
||||
if err != nil {
|
||||
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
|
||||
continue
|
||||
}
|
||||
|
||||
if fc, ok := fromContext(fr.Context()); ok {
|
||||
fc.responseWriter = rq.Response
|
||||
|
||||
// Queue the request and wait for completion if Done channel was provided
|
||||
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name))
|
||||
|
||||
w.requestChan <- fc
|
||||
if rq.Done != nil {
|
||||
go func() {
|
||||
<-fc.done
|
||||
close(rq.Done)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
134
threadFramework_test.go
Normal file
134
threadFramework_test.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// mockWorkerExtension implements the WorkerExtension interface
|
||||
type mockWorkerExtension struct {
|
||||
name string
|
||||
fileName string
|
||||
env PreparedEnv
|
||||
minThreads int
|
||||
requestChan chan *WorkerRequest
|
||||
activatedCount int
|
||||
drainCount int
|
||||
deactivatedCount int
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension {
|
||||
return &mockWorkerExtension{
|
||||
name: name,
|
||||
fileName: fileName,
|
||||
env: make(PreparedEnv),
|
||||
minThreads: minThreads,
|
||||
requestChan: make(chan *WorkerRequest, 10), // Buffer to avoid blocking
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) Name() string {
|
||||
return m.name
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) FileName() string {
|
||||
return m.fileName
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) Env() PreparedEnv {
|
||||
return m.env
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) GetMinThreads() int {
|
||||
return m.minThreads
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.activatedCount++
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.drainCount++
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.deactivatedCount++
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest {
|
||||
return <-m.requestChan
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest) {
|
||||
m.requestChan <- r
|
||||
}
|
||||
|
||||
func (m *mockWorkerExtension) GetActivatedCount() int {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.activatedCount
|
||||
}
|
||||
|
||||
func TestWorkerExtension(t *testing.T) {
|
||||
// Create a mock extension
|
||||
mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1)
|
||||
|
||||
// Register the mock extension
|
||||
RegisterExternalWorker(mockExt)
|
||||
|
||||
// Clean up external workers after test to avoid interfering with other tests
|
||||
defer func() {
|
||||
delete(externalWorkers, mockExt.Name())
|
||||
}()
|
||||
|
||||
// Initialize FrankenPHP with a worker that has a different name than our extension
|
||||
err := Init()
|
||||
require.NoError(t, err)
|
||||
defer Shutdown()
|
||||
|
||||
// Wait a bit for the worker to be ready
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Verify that the extension's thread was activated
|
||||
assert.GreaterOrEqual(t, mockExt.GetActivatedCount(), 1, "Thread should have been activated")
|
||||
|
||||
// Create a test request
|
||||
req := httptest.NewRequest("GET", "http://example.com/test/?foo=bar", nil)
|
||||
req.Header.Set("X-Test-Header", "test-value")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
// Create a channel to signal when the request is done
|
||||
done := make(chan struct{})
|
||||
|
||||
// Inject the request into the worker through the extension
|
||||
mockExt.InjectRequest(&WorkerRequest{
|
||||
Request: req,
|
||||
Response: w,
|
||||
Done: done,
|
||||
})
|
||||
|
||||
// Wait for the request to be fully processed
|
||||
<-done
|
||||
|
||||
// Check the response - now safe from race conditions
|
||||
resp := w.Result()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
// The worker.php script should output information about the request
|
||||
// We're just checking that we got a response, not the specific content
|
||||
assert.NotEmpty(t, body, "Response body should not be empty")
|
||||
}
|
||||
@@ -19,10 +19,13 @@ type workerThread struct {
|
||||
dummyContext *frankenPHPContext
|
||||
workerContext *frankenPHPContext
|
||||
backoff *exponentialBackoff
|
||||
externalWorker WorkerExtension
|
||||
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
|
||||
}
|
||||
|
||||
func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
externalWorker := externalWorkers[worker.name]
|
||||
|
||||
thread.setHandler(&workerThread{
|
||||
state: thread.state,
|
||||
thread: thread,
|
||||
@@ -32,6 +35,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
minBackoff: 100 * time.Millisecond,
|
||||
maxConsecutiveFailures: worker.maxConsecutiveFailures,
|
||||
},
|
||||
externalWorker: externalWorker,
|
||||
})
|
||||
worker.attachThread(thread)
|
||||
}
|
||||
@@ -40,16 +44,28 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
func (handler *workerThread) beforeScriptExecution() string {
|
||||
switch handler.state.get() {
|
||||
case stateTransitionRequested:
|
||||
if handler.externalWorker != nil {
|
||||
handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex)
|
||||
}
|
||||
handler.worker.detachThread(handler.thread)
|
||||
return handler.thread.transitionToNewHandler()
|
||||
case stateRestarting:
|
||||
if handler.externalWorker != nil {
|
||||
handler.externalWorker.ThreadDrainNotification(handler.thread.threadIndex)
|
||||
}
|
||||
handler.state.set(stateYielding)
|
||||
handler.state.waitFor(stateReady, stateShuttingDown)
|
||||
return handler.beforeScriptExecution()
|
||||
case stateReady, stateTransitionComplete:
|
||||
if handler.externalWorker != nil {
|
||||
handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex)
|
||||
}
|
||||
setupWorkerScript(handler, handler.worker)
|
||||
return handler.worker.fileName
|
||||
case stateShuttingDown:
|
||||
if handler.externalWorker != nil {
|
||||
handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex)
|
||||
}
|
||||
handler.worker.detachThread(handler.thread)
|
||||
// signal to stop
|
||||
return ""
|
||||
|
||||
14
worker.go
14
worker.go
@@ -44,13 +44,19 @@ func initWorkers(opt []workerOpt) error {
|
||||
workers = append(workers, w)
|
||||
}
|
||||
|
||||
for _, worker := range workers {
|
||||
workersReady.Add(worker.num)
|
||||
for i := 0; i < worker.num; i++ {
|
||||
for _, w := range workers {
|
||||
workersReady.Add(w.num)
|
||||
for i := 0; i < w.num; i++ {
|
||||
thread := getInactivePHPThread()
|
||||
convertToWorkerThread(thread, worker)
|
||||
convertToWorkerThread(thread, w)
|
||||
go func() {
|
||||
thread.state.waitFor(stateReady)
|
||||
|
||||
// create a pipe from the external worker to the main worker
|
||||
// note: this is locked to the initial thread size the external worker requested
|
||||
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
|
||||
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
|
||||
}
|
||||
workersReady.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user