mirror of
https://github.com/dunglas/frankenphp.git
synced 2025-12-24 13:38:11 +08:00
refactor: improve ExtensionWorkers API (#1952)
* refactor: improve ExtensionWorkers API * Update workerextension.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update workerextension.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update caddy/app.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * review * fix tests * docs * errors * improved error handling * fix race * add missing return * use %q in Errorf --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
26
caddy/app.go
26
caddy/app.go
@@ -7,6 +7,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/caddyserver/caddy/v2"
|
||||
@@ -18,6 +19,22 @@ import (
|
||||
"github.com/dunglas/frankenphp/internal/fastabs"
|
||||
)
|
||||
|
||||
var (
|
||||
options []frankenphp.Option
|
||||
optionsMU sync.RWMutex
|
||||
)
|
||||
|
||||
// EXPERIMENTAL: RegisterWorkers provides a way for extensions to register frankenphp.Workers
|
||||
func RegisterWorkers(name, fileName string, num int, wo ...frankenphp.WorkerOption) frankenphp.Workers {
|
||||
w, opt := frankenphp.WithExtensionWorkers(name, fileName, num, wo...)
|
||||
|
||||
optionsMU.Lock()
|
||||
options = append(options, opt)
|
||||
optionsMU.Unlock()
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
// FrankenPHPApp represents the global "frankenphp" directive in the Caddyfile
|
||||
// it's responsible for starting up the global PHP instance and all threads
|
||||
//
|
||||
@@ -118,6 +135,11 @@ func (f *FrankenPHPApp) Start() error {
|
||||
frankenphp.WithPhpIni(f.PhpIni),
|
||||
frankenphp.WithMaxWaitTime(f.MaxWaitTime),
|
||||
}
|
||||
|
||||
optionsMU.RLock()
|
||||
opts = append(opts, options...)
|
||||
optionsMU.RUnlock()
|
||||
|
||||
for _, w := range append(f.Workers) {
|
||||
workerOpts := []frankenphp.WorkerOption{
|
||||
frankenphp.WithWorkerEnv(w.Env),
|
||||
@@ -151,6 +173,10 @@ func (f *FrankenPHPApp) Stop() error {
|
||||
f.NumThreads = 0
|
||||
f.MaxWaitTime = 0
|
||||
|
||||
optionsMU.Lock()
|
||||
options = nil
|
||||
optionsMU.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -965,7 +965,7 @@ func TestMaxWaitTime(t *testing.T) {
|
||||
for range 10 {
|
||||
go func() {
|
||||
statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10", t)
|
||||
if statusCode == http.StatusGatewayTimeout {
|
||||
if statusCode == http.StatusServiceUnavailable {
|
||||
success.Store(true)
|
||||
}
|
||||
wg.Done()
|
||||
@@ -973,7 +973,7 @@ func TestMaxWaitTime(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status")
|
||||
require.True(t, success.Load(), "At least one request should have failed with a 503 Service Unavailable status")
|
||||
}
|
||||
|
||||
func TestMaxWaitTimeWorker(t *testing.T) {
|
||||
@@ -1012,23 +1012,26 @@ func TestMaxWaitTimeWorker(t *testing.T) {
|
||||
for range 10 {
|
||||
go func() {
|
||||
statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10&iteration=1", t)
|
||||
if statusCode == http.StatusGatewayTimeout {
|
||||
if statusCode == http.StatusServiceUnavailable {
|
||||
success.Store(true)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status")
|
||||
require.True(t, success.Load(), "At least one request should have failed with a 503 Service Unavailable status")
|
||||
|
||||
// Fetch metrics
|
||||
resp, err := http.Get("http://localhost:2999/metrics")
|
||||
require.NoError(t, err, "failed to fetch metrics")
|
||||
defer resp.Body.Close()
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, resp.Body.Close())
|
||||
})
|
||||
|
||||
// Read and parse metrics
|
||||
metrics := new(bytes.Buffer)
|
||||
_, err = metrics.ReadFrom(resp.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
expectedMetrics := `
|
||||
# TYPE frankenphp_worker_queue_depth gauge
|
||||
|
||||
@@ -2,6 +2,7 @@ package caddy
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
@@ -192,8 +193,11 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c
|
||||
frankenphp.WithOriginalRequest(&origReq),
|
||||
frankenphp.WithWorkerName(workerName),
|
||||
)
|
||||
if err != nil {
|
||||
return caddyhttp.Error(http.StatusInternalServerError, err)
|
||||
}
|
||||
|
||||
if err = frankenphp.ServeHTTP(w, fr); err != nil {
|
||||
if err = frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, &frankenphp.ErrRejected{}) {
|
||||
return caddyhttp.Error(http.StatusInternalServerError, err)
|
||||
}
|
||||
|
||||
|
||||
34
context.go
34
context.go
@@ -2,6 +2,8 @@ package frankenphp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -117,23 +119,25 @@ func (fc *frankenPHPContext) closeContext() {
|
||||
}
|
||||
|
||||
// validate checks if the request should be outright rejected
|
||||
func (fc *frankenPHPContext) validate() bool {
|
||||
func (fc *frankenPHPContext) validate() error {
|
||||
if strings.Contains(fc.request.URL.Path, "\x00") {
|
||||
fc.rejectBadRequest("Invalid request path")
|
||||
fc.reject(ErrInvalidRequestPath)
|
||||
|
||||
return false
|
||||
return ErrInvalidRequestPath
|
||||
}
|
||||
|
||||
contentLengthStr := fc.request.Header.Get("Content-Length")
|
||||
if contentLengthStr != "" {
|
||||
if contentLength, err := strconv.Atoi(contentLengthStr); err != nil || contentLength < 0 {
|
||||
fc.rejectBadRequest("invalid Content-Length header: " + contentLengthStr)
|
||||
e := fmt.Errorf("%w: %q", ErrInvalidContentLengthHeader, contentLengthStr)
|
||||
|
||||
return false
|
||||
fc.reject(e)
|
||||
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fc *frankenPHPContext) clientHasClosed() bool {
|
||||
@@ -149,16 +153,22 @@ func (fc *frankenPHPContext) clientHasClosed() bool {
|
||||
}
|
||||
}
|
||||
|
||||
// reject sends a response with the given status code and message
|
||||
func (fc *frankenPHPContext) reject(statusCode int, message string) {
|
||||
// reject sends a response with the given status code and error
|
||||
func (fc *frankenPHPContext) reject(err error) {
|
||||
if fc.isDone {
|
||||
return
|
||||
}
|
||||
|
||||
re := &ErrRejected{}
|
||||
if !errors.As(err, re) {
|
||||
// Should never happen
|
||||
panic("only instance of ErrRejected can be passed to reject")
|
||||
}
|
||||
|
||||
rw := fc.responseWriter
|
||||
if rw != nil {
|
||||
rw.WriteHeader(statusCode)
|
||||
_, _ = rw.Write([]byte(message))
|
||||
rw.WriteHeader(re.status)
|
||||
_, _ = rw.Write([]byte(err.Error()))
|
||||
|
||||
if f, ok := rw.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
@@ -167,7 +177,3 @@ func (fc *frankenPHPContext) reject(statusCode int, message string) {
|
||||
|
||||
fc.closeContext()
|
||||
}
|
||||
|
||||
func (fc *frankenPHPContext) rejectBadRequest(message string) {
|
||||
fc.reject(http.StatusBadRequest, message)
|
||||
}
|
||||
|
||||
@@ -52,6 +52,10 @@ var (
|
||||
ErrScriptExecution = errors.New("error during PHP script execution")
|
||||
ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config")
|
||||
|
||||
ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest}
|
||||
ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest}
|
||||
ErrMaxWaitTimeExceeded = ErrRejected{"maximum request handling time exceeded", http.StatusServiceUnavailable}
|
||||
|
||||
isRunning bool
|
||||
onServerShutdown []func()
|
||||
|
||||
@@ -63,34 +67,43 @@ var (
|
||||
maxWaitTime time.Duration
|
||||
)
|
||||
|
||||
type ErrRejected struct {
|
||||
message string
|
||||
status int
|
||||
}
|
||||
|
||||
func (e ErrRejected) Error() string {
|
||||
return e.message
|
||||
}
|
||||
|
||||
type syslogLevel int
|
||||
|
||||
const (
|
||||
emerg syslogLevel = iota // system is unusable
|
||||
alert // action must be taken immediately
|
||||
crit // critical conditions
|
||||
err // error conditions
|
||||
warning // warning conditions
|
||||
notice // normal but significant condition
|
||||
info // informational
|
||||
debug // debug-level messages
|
||||
syslogLevelEmerg syslogLevel = iota // system is unusable
|
||||
syslogLevelAlert // action must be taken immediately
|
||||
syslogLevelCrit // critical conditions
|
||||
syslogLevelErr // error conditions
|
||||
syslogLevelWarn // warning conditions
|
||||
syslogLevelNotice // normal but significant condition
|
||||
syslogLevelInfo // informational
|
||||
syslogLevelDebug // debug-level messages
|
||||
)
|
||||
|
||||
func (l syslogLevel) String() string {
|
||||
switch l {
|
||||
case emerg:
|
||||
case syslogLevelEmerg:
|
||||
return "emerg"
|
||||
case alert:
|
||||
case syslogLevelAlert:
|
||||
return "alert"
|
||||
case crit:
|
||||
case syslogLevelCrit:
|
||||
return "crit"
|
||||
case err:
|
||||
case syslogLevelErr:
|
||||
return "err"
|
||||
case warning:
|
||||
case syslogLevelWarn:
|
||||
return "warning"
|
||||
case notice:
|
||||
case syslogLevelNotice:
|
||||
return "notice"
|
||||
case debug:
|
||||
case syslogLevelDebug:
|
||||
return "debug"
|
||||
default:
|
||||
return "info"
|
||||
@@ -210,11 +223,6 @@ func Init(options ...Option) error {
|
||||
|
||||
registerExtensions()
|
||||
|
||||
// add registered external workers
|
||||
for _, ew := range extensionWorkers {
|
||||
options = append(options, WithWorkers(ew.name, ew.fileName, ew.num, ew.options...))
|
||||
}
|
||||
|
||||
opt := &opt{}
|
||||
for _, o := range options {
|
||||
if err := o(opt); err != nil {
|
||||
@@ -336,20 +344,17 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
|
||||
|
||||
fc.responseWriter = responseWriter
|
||||
|
||||
if !fc.validate() {
|
||||
return nil
|
||||
if err := fc.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Detect if a worker is available to handle this request
|
||||
if fc.worker != nil {
|
||||
fc.worker.handleRequest(fc)
|
||||
|
||||
return nil
|
||||
return fc.worker.handleRequest(fc)
|
||||
}
|
||||
|
||||
// If no worker was available, send the request to non-worker threads
|
||||
handleRequestWithRegularPHPThreads(fc)
|
||||
return nil
|
||||
return handleRequestWithRegularPHPThreads(fc)
|
||||
}
|
||||
|
||||
//export go_ub_write
|
||||
@@ -566,19 +571,19 @@ func go_log(message *C.char, level C.int) {
|
||||
m := C.GoString(message)
|
||||
|
||||
var le syslogLevel
|
||||
if level < C.int(emerg) || level > C.int(debug) {
|
||||
le = info
|
||||
if level < C.int(syslogLevelEmerg) || level > C.int(syslogLevelDebug) {
|
||||
le = syslogLevelInfo
|
||||
} else {
|
||||
le = syslogLevel(level)
|
||||
}
|
||||
|
||||
switch le {
|
||||
case emerg, alert, crit, err:
|
||||
case syslogLevelEmerg, syslogLevelAlert, syslogLevelCrit, syslogLevelErr:
|
||||
logger.LogAttrs(context.Background(), slog.LevelError, m, slog.String("syslog_level", syslogLevel(level).String()))
|
||||
|
||||
case warning:
|
||||
case syslogLevelWarn:
|
||||
logger.LogAttrs(context.Background(), slog.LevelWarn, m, slog.String("syslog_level", syslogLevel(level).String()))
|
||||
case debug:
|
||||
case syslogLevelDebug:
|
||||
logger.LogAttrs(context.Background(), slog.LevelDebug, m, slog.String("syslog_level", syslogLevel(level).String()))
|
||||
|
||||
default:
|
||||
|
||||
@@ -78,7 +78,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), *
|
||||
}
|
||||
|
||||
err := frankenphp.Init(initOpts...)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
defer frankenphp.Shutdown()
|
||||
|
||||
handler := func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -86,7 +86,9 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), *
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = frankenphp.ServeHTTP(w, req)
|
||||
assert.NoError(t, err)
|
||||
if err != nil && !errors.As(err, &frankenphp.ErrRejected{}) {
|
||||
assert.Fail(t, fmt.Sprintf("Received unexpected error:\n%+v", err))
|
||||
}
|
||||
}
|
||||
|
||||
var ts *httptest.Server
|
||||
@@ -109,6 +111,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), *
|
||||
|
||||
func testRequest(req *http.Request, handler func(http.ResponseWriter, *http.Request), t *testing.T) (string, *http.Response) {
|
||||
t.Helper()
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
handler(w, req)
|
||||
resp := w.Result()
|
||||
@@ -988,7 +991,7 @@ func FuzzRequest(f *testing.F) {
|
||||
// The response status must be 400 if the request path contains null bytes
|
||||
if strings.Contains(req.URL.Path, "\x00") {
|
||||
assert.Equal(t, 400, resp.StatusCode)
|
||||
assert.Contains(t, body, "Invalid request path")
|
||||
assert.Contains(t, body, "invalid request path")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
75
options.go
75
options.go
@@ -35,6 +35,7 @@ type workerOpt struct {
|
||||
env PreparedEnv
|
||||
watch []string
|
||||
maxConsecutiveFailures int
|
||||
extensionWorkers *extensionWorkers
|
||||
onThreadReady func(int)
|
||||
onThreadShutdown func(int)
|
||||
onServerStartup func()
|
||||
@@ -67,7 +68,7 @@ func WithMetrics(m Metrics) Option {
|
||||
}
|
||||
|
||||
// WithWorkers configures the PHP workers to start
|
||||
func WithWorkers(name string, fileName string, num int, options ...WorkerOption) Option {
|
||||
func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option {
|
||||
return func(o *opt) error {
|
||||
worker := workerOpt{
|
||||
name: name,
|
||||
@@ -90,6 +91,54 @@ func WithWorkers(name string, fileName string, num int, options ...WorkerOption)
|
||||
}
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: WithExtensionWorkers allow extensions to create workers.
|
||||
//
|
||||
// A worker script with the provided name, fileName and thread count will be registered, along with additional
|
||||
// configuration through WorkerOptions.
|
||||
//
|
||||
// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
|
||||
//
|
||||
// Extension workers receive the lowest priority when determining thread allocations. If the requested number of threads
|
||||
// cannot be allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate
|
||||
// more total threads). Don't be greedy.
|
||||
func WithExtensionWorkers(name, fileName string, numThreads int, options ...WorkerOption) (Workers, Option) {
|
||||
w := &extensionWorkers{
|
||||
name: name,
|
||||
fileName: fileName,
|
||||
num: numThreads,
|
||||
}
|
||||
|
||||
w.options = append(options, withExtensionWorkers(w))
|
||||
|
||||
return w, WithWorkers(w.name, w.fileName, w.num, w.options...)
|
||||
}
|
||||
|
||||
// WithLogger configures the global logger to use.
|
||||
func WithLogger(l *slog.Logger) Option {
|
||||
return func(o *opt) error {
|
||||
o.logger = l
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPhpIni configures user defined PHP ini settings.
|
||||
func WithPhpIni(overrides map[string]string) Option {
|
||||
return func(o *opt) error {
|
||||
o.phpIni = overrides
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread.
|
||||
func WithMaxWaitTime(maxWaitTime time.Duration) Option {
|
||||
return func(o *opt) error {
|
||||
o.maxWaitTime = maxWaitTime
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWorkerEnv sets environment variables for the worker
|
||||
func WithWorkerEnv(env map[string]string) WorkerOption {
|
||||
return func(w *workerOpt) error {
|
||||
@@ -154,27 +203,9 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger configures the global logger to use.
|
||||
func WithLogger(l *slog.Logger) Option {
|
||||
return func(o *opt) error {
|
||||
o.logger = l
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPhpIni configures user defined PHP ini settings.
|
||||
func WithPhpIni(overrides map[string]string) Option {
|
||||
return func(o *opt) error {
|
||||
o.phpIni = overrides
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread.
|
||||
func WithMaxWaitTime(maxWaitTime time.Duration) Option {
|
||||
return func(o *opt) error {
|
||||
o.maxWaitTime = maxWaitTime
|
||||
func withExtensionWorkers(w *extensionWorkers) WorkerOption {
|
||||
return func(wo *workerOpt) error {
|
||||
wo.extensionWorkers = w
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -84,14 +84,15 @@ func (handler *regularThread) afterRequest() {
|
||||
handler.requestContext = nil
|
||||
}
|
||||
|
||||
func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) {
|
||||
func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) error {
|
||||
metrics.StartRequest()
|
||||
select {
|
||||
case regularRequestChan <- fc:
|
||||
// a thread was available to handle the request immediately
|
||||
<-fc.done
|
||||
metrics.StopRequest()
|
||||
return
|
||||
|
||||
return nil
|
||||
default:
|
||||
// no thread was available
|
||||
}
|
||||
@@ -104,14 +105,17 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) {
|
||||
metrics.DequeuedRequest()
|
||||
<-fc.done
|
||||
metrics.StopRequest()
|
||||
return
|
||||
|
||||
return nil
|
||||
case scaleChan <- fc:
|
||||
// the request has triggered scaling, continue to wait for a thread
|
||||
case <-timeoutChan(maxWaitTime):
|
||||
// the request has timed out stalling
|
||||
metrics.DequeuedRequest()
|
||||
fc.reject(504, "Gateway Timeout")
|
||||
return
|
||||
|
||||
fc.reject(ErrMaxWaitTimeExceeded)
|
||||
|
||||
return ErrMaxWaitTimeExceeded
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
20
worker.go
20
worker.go
@@ -136,6 +136,10 @@ func newWorker(o workerOpt) (*worker, error) {
|
||||
onThreadShutdown: o.onThreadShutdown,
|
||||
}
|
||||
|
||||
if o.extensionWorkers != nil {
|
||||
o.extensionWorkers.internalWorker = w
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
@@ -224,7 +228,7 @@ func (worker *worker) countThreads() int {
|
||||
return l
|
||||
}
|
||||
|
||||
func (worker *worker) handleRequest(fc *frankenPHPContext) {
|
||||
func (worker *worker) handleRequest(fc *frankenPHPContext) error {
|
||||
metrics.StartWorkerRequest(worker.name)
|
||||
|
||||
// dispatch requests to all worker threads in order
|
||||
@@ -235,7 +239,8 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) {
|
||||
worker.threadMutex.RUnlock()
|
||||
<-fc.done
|
||||
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
|
||||
return
|
||||
|
||||
return nil
|
||||
default:
|
||||
// thread is busy, continue
|
||||
}
|
||||
@@ -250,14 +255,17 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) {
|
||||
metrics.DequeuedWorkerRequest(worker.name)
|
||||
<-fc.done
|
||||
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
|
||||
return
|
||||
|
||||
return nil
|
||||
case scaleChan <- fc:
|
||||
// the request has triggered scaling, continue to wait for a thread
|
||||
case <-timeoutChan(maxWaitTime):
|
||||
metrics.DequeuedWorkerRequest(worker.name)
|
||||
// the request has timed out stalling
|
||||
fc.reject(504, "Gateway Timeout")
|
||||
return
|
||||
metrics.DequeuedWorkerRequest(worker.name)
|
||||
|
||||
fc.reject(ErrMaxWaitTimeExceeded)
|
||||
|
||||
return ErrMaxWaitTimeExceeded
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,56 +1,30 @@
|
||||
package frankenphp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// EXPERIMENTAL: Worker allows you to register a worker where, instead of calling FrankenPHP handlers on
|
||||
// frankenphp_handle_request(), the GetRequest method is called.
|
||||
//
|
||||
// You may provide an http.Request that will be conferred to the underlying worker script,
|
||||
// or custom parameters that will be passed to frankenphp_handle_request().
|
||||
//
|
||||
// After the execution of frankenphp_handle_request(), the return value WorkerRequest.AfterFunc will be called,
|
||||
// with the optional return value of the callback passed as parameter.
|
||||
//
|
||||
// A worker script with the provided name, fileName and thread count will be registered, along with additional
|
||||
// configuration through WorkerOptions.
|
||||
//
|
||||
// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
|
||||
//
|
||||
// Extension workers receive the lowest priority when determining thread allocations. If MinThreads cannot be
|
||||
// allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more
|
||||
// total threads). Don't be greedy.
|
||||
type Worker struct {
|
||||
name string
|
||||
fileName string
|
||||
num int
|
||||
options []WorkerOption
|
||||
// EXPERIMENTAL: Workers allows you to register a worker.
|
||||
type Workers interface {
|
||||
// SendRequest calls the closure passed to frankenphp_handle_request() and updates the PHP context .
|
||||
// The generated HTTP response will be written through the provided writer.
|
||||
SendRequest(rw http.ResponseWriter, r *http.Request) error
|
||||
// SendMessage calls the closure passed to frankenphp_handle_request(), passes message as a parameter, and returns the value produced by the closure.
|
||||
SendMessage(message any, rw http.ResponseWriter) (any, error)
|
||||
// NumThreads returns the number of available threads.
|
||||
NumThreads() int
|
||||
}
|
||||
|
||||
var extensionWorkers = make(map[string]Worker)
|
||||
|
||||
// EXPERIMENTAL: RegisterWorker registers an external worker.
|
||||
// external workers are booted together with regular workers on server startup.
|
||||
func RegisterWorker(worker Worker) error {
|
||||
if _, exists := extensionWorkers[worker.name]; exists {
|
||||
return errors.New("worker with this name is already registered: " + worker.name)
|
||||
}
|
||||
|
||||
extensionWorkers[worker.name] = worker
|
||||
|
||||
return nil
|
||||
type extensionWorkers struct {
|
||||
name string
|
||||
fileName string
|
||||
num int
|
||||
options []WorkerOption
|
||||
internalWorker *worker
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: SendRequest sends an HTTP request to the worker and writes the response to the provided ResponseWriter.
|
||||
func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error {
|
||||
worker := getWorkerByName(w.name)
|
||||
|
||||
if worker == nil {
|
||||
return errors.New("worker not found: " + w.name)
|
||||
}
|
||||
|
||||
func (w *extensionWorkers) SendRequest(rw http.ResponseWriter, r *http.Request) error {
|
||||
fr, err := NewRequestWithContext(
|
||||
r,
|
||||
WithOriginalRequest(r),
|
||||
@@ -61,50 +35,22 @@ func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ServeHTTP(rw, fr)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return ServeHTTP(rw, fr)
|
||||
}
|
||||
|
||||
func (w Worker) NumThreads() int {
|
||||
worker := getWorkerByName(w.name)
|
||||
|
||||
if worker == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return worker.countThreads()
|
||||
func (w *extensionWorkers) NumThreads() int {
|
||||
return w.internalWorker.countThreads()
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: SendMessage sends a message to the worker and waits for a response.
|
||||
func (w Worker) SendMessage(message any, rw http.ResponseWriter) (any, error) {
|
||||
internalWorker := getWorkerByName(w.name)
|
||||
|
||||
if internalWorker == nil {
|
||||
return nil, errors.New("worker not found: " + w.name)
|
||||
}
|
||||
|
||||
func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) (any, error) {
|
||||
fc := newFrankenPHPContext()
|
||||
fc.logger = logger
|
||||
fc.worker = internalWorker
|
||||
fc.worker = w.internalWorker
|
||||
fc.responseWriter = rw
|
||||
fc.handlerParameters = message
|
||||
|
||||
internalWorker.handleRequest(fc)
|
||||
err := w.internalWorker.handleRequest(fc)
|
||||
|
||||
return fc.handlerReturn, nil
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: NewWorker registers an external worker with the given options
|
||||
func NewWorker(name string, fileName string, num int, options ...WorkerOption) Worker {
|
||||
return Worker{
|
||||
name: name,
|
||||
fileName: fileName,
|
||||
num: num,
|
||||
options: options,
|
||||
}
|
||||
return fc.handlerReturn, err
|
||||
}
|
||||
|
||||
@@ -9,14 +9,14 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWorkerExtension(t *testing.T) {
|
||||
func TestWorkersExtension(t *testing.T) {
|
||||
readyWorkers := 0
|
||||
shutdownWorkers := 0
|
||||
serverStarts := 0
|
||||
serverShutDowns := 0
|
||||
|
||||
externalWorker := NewWorker(
|
||||
"externalWorker",
|
||||
externalWorkers, o := WithExtensionWorkers(
|
||||
"extensionWorkers",
|
||||
"testdata/worker.php",
|
||||
1,
|
||||
WithWorkerOnReady(func(id int) {
|
||||
@@ -33,20 +33,16 @@ func TestWorkerExtension(t *testing.T) {
|
||||
}),
|
||||
)
|
||||
|
||||
assert.NoError(t, RegisterWorker(externalWorker))
|
||||
|
||||
require.NoError(t, Init())
|
||||
defer func() {
|
||||
// Clean up external workers after test to avoid interfering with other tests
|
||||
delete(extensionWorkers, externalWorker.name)
|
||||
require.NoError(t, Init(o))
|
||||
t.Cleanup(func() {
|
||||
Shutdown()
|
||||
assert.Equal(t, 1, shutdownWorkers, "Worker shutdown hook should have been called")
|
||||
assert.Equal(t, 1, serverShutDowns, "Server shutdown hook should have been called")
|
||||
}()
|
||||
})
|
||||
|
||||
assert.Equal(t, readyWorkers, 1, "Worker thread should have called onReady()")
|
||||
assert.Equal(t, serverStarts, 1, "Server start hook should have been called")
|
||||
assert.Equal(t, externalWorker.NumThreads(), 1, "NumThreads() should report 1 thread")
|
||||
assert.Equal(t, externalWorkers.NumThreads(), 1, "NumThreads() should report 1 thread")
|
||||
|
||||
// Create a test request
|
||||
req := httptest.NewRequest("GET", "https://example.com/test/?foo=bar", nil)
|
||||
@@ -54,7 +50,7 @@ func TestWorkerExtension(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
// Inject the request into the worker through the extension
|
||||
err := externalWorker.SendRequest(w, req)
|
||||
err := externalWorkers.SendRequest(w, req)
|
||||
assert.NoError(t, err, "Sending request should not produce an error")
|
||||
|
||||
resp := w.Result()
|
||||
@@ -67,38 +63,22 @@ func TestWorkerExtension(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWorkerExtensionSendMessage(t *testing.T) {
|
||||
externalWorker := NewWorker("externalWorker", "testdata/message-worker.php", 1)
|
||||
assert.NoError(t, RegisterWorker(externalWorker))
|
||||
externalWorker, o := WithExtensionWorkers("extensionWorkers", "testdata/message-worker.php", 1)
|
||||
|
||||
// Clean up external workers after test to avoid interfering with other tests
|
||||
defer func() {
|
||||
delete(extensionWorkers, externalWorker.name)
|
||||
}()
|
||||
|
||||
err := Init()
|
||||
err := Init(o)
|
||||
require.NoError(t, err)
|
||||
defer Shutdown()
|
||||
t.Cleanup(Shutdown)
|
||||
|
||||
result, err := externalWorker.SendMessage("Hello Worker", nil)
|
||||
assert.NoError(t, err, "Sending message should not produce an error")
|
||||
ret, err := externalWorker.SendMessage("Hello Workers", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
switch v := result.(type) {
|
||||
case string:
|
||||
assert.Equal(t, "received message: Hello Worker", v)
|
||||
default:
|
||||
t.Fatalf("Expected result to be string, got %T", v)
|
||||
}
|
||||
assert.Equal(t, "received message: Hello Workers", ret)
|
||||
}
|
||||
|
||||
func TestErrorIf2WorkersHaveSameName(t *testing.T) {
|
||||
w := NewWorker("duplicateWorker", "testdata/worker.php", 1)
|
||||
w2 := NewWorker("duplicateWorker", "testdata/worker2.php", 1)
|
||||
_, o1 := WithExtensionWorkers("duplicateWorker", "testdata/worker.php", 1)
|
||||
_, o2 := WithExtensionWorkers("duplicateWorker", "testdata/worker2.php", 1)
|
||||
|
||||
err := RegisterWorker(w)
|
||||
require.NoError(t, err, "First registration should succeed")
|
||||
|
||||
err = RegisterWorker(w2)
|
||||
require.Error(t, err, "Second registration with duplicate name should fail")
|
||||
// Clean up external workers after test to avoid interfering with other tests
|
||||
extensionWorkers = make(map[string]Worker)
|
||||
t.Cleanup(Shutdown)
|
||||
require.Error(t, Init(o1, o2))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user