fix: correctly set the Mercure hub for the main worker request

This commit is contained in:
Kévin Dunglas
2025-11-23 23:14:23 +01:00
parent e6b3f70d91
commit 6c764ad9c5
7 changed files with 62 additions and 20 deletions

View File

@@ -114,23 +114,32 @@ retry:
func (f *FrankenPHPApp) addModuleWorkers(workers ...workerConfig) ([]workerConfig, error) {
for i := range workers {
w := &workers[i]
if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(w.FileName) {
w.FileName = filepath.Join(frankenphp.EmbeddedAppPath, w.FileName)
}
if w.Name == "" {
w.Name = f.generateUniqueModuleWorkerName(w.FileName)
} else if !strings.HasPrefix(w.Name, "m#") {
w.Name = "m#" + w.Name
}
f.Workers = append(f.Workers, *w)
}
return workers, nil
}
func (f *FrankenPHPApp) Start() error {
repl := caddy.NewReplacer()
opts := []frankenphp.Option{
optionsMU.RLock()
opts := make([]frankenphp.Option, 0, len(options)+len(f.Workers)+7)
opts = append(opts, options...)
optionsMU.RUnlock()
opts = append(opts,
frankenphp.WithContext(f.ctx),
frankenphp.WithLogger(f.logger),
frankenphp.WithNumThreads(f.NumThreads),
@@ -138,18 +147,27 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithMetrics(f.metrics),
frankenphp.WithPhpIni(f.PhpIni),
frankenphp.WithMaxWaitTime(f.MaxWaitTime),
}
)
optionsMU.RLock()
opts = append(opts, options...)
optionsMU.RUnlock()
for _, w := range f.Workers {
workerOpts := make([]frankenphp.WorkerOption, 0, len(w.requestOptions)+4)
for _, w := range append(f.Workers) {
workerOpts := []frankenphp.WorkerOption{
frankenphp.WithWorkerEnv(w.Env),
frankenphp.WithWorkerWatchMode(w.Watch),
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
if w.requestOptions == nil {
workerOpts = append(workerOpts,
frankenphp.WithWorkerEnv(w.Env),
frankenphp.WithWorkerWatchMode(w.Watch),
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
)
} else {
workerOpts = append(
workerOpts,
frankenphp.WithWorkerEnv(w.Env),
frankenphp.WithWorkerWatchMode(w.Watch),
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
frankenphp.WithWorkerRequestOptions(w.requestOptions...),
)
}
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...))

View File

@@ -74,6 +74,8 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error {
return fmt.Errorf(`expected ctx.App("frankenphp") to return *FrankenPHPApp, got nil`)
}
f.assignMercureHubRequestOption(ctx)
for i, wc := range f.Workers {
// make the file path absolute from the public directory
// this can only be done if the root is defined inside php_server
@@ -85,6 +87,12 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error {
if f.Env != nil {
wc.inheritEnv(f.Env)
}
wc.requestOptions = []frankenphp.RequestOption{frankenphp.WithRequestLogger(f.logger)}
if f.mercureHubRequestOption != nil {
wc.requestOptions = append(wc.requestOptions, *f.mercureHubRequestOption)
}
f.Workers[i] = wc
}
@@ -146,8 +154,6 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error {
}
}
f.assignMercureHubRequestOption(ctx)
return nil
}
@@ -592,7 +598,7 @@ func parsePhpServer(h httpcaddyfile.Helper) ([]httpcaddyfile.ConfigValue, error)
// workers can also match a path without being in the public directory
// in this case we need to prepend the worker routes to the existing routes
func prependWorkerRoutes(routes caddyhttp.RouteList, h httpcaddyfile.Helper, f FrankenPHPModule, fsrv caddy.Module, disableFsrv bool) caddyhttp.RouteList {
allWorkerMatches := caddyhttp.MatchPath{}
var allWorkerMatches caddyhttp.MatchPath
for _, w := range f.Workers {
for _, path := range w.MatchPath {
allWorkerMatches = append(allWorkerMatches, path)
@@ -607,7 +613,7 @@ func prependWorkerRoutes(routes caddyhttp.RouteList, h httpcaddyfile.Helper, f F
if !disableFsrv {
routes = append(routes, caddyhttp.Route{
MatcherSetsRaw: []caddy.ModuleMap{
caddy.ModuleMap{
{
"file": h.JSON(fileserver.MatchFile{
TryFiles: []string{"{http.request.uri.path}"},
Root: f.Root,

View File

@@ -38,6 +38,8 @@ type workerConfig struct {
MatchPath []string `json:"match_path,omitempty"`
// MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick)
MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"`
requestOptions []frankenphp.RequestOption
}
func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {

View File

@@ -100,7 +100,7 @@ func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Reques
// newDummyContext creates a fake context from a request path
func newDummyContext(requestPath string, opts ...RequestOption) (*frankenPHPContext, error) {
r, err := http.NewRequest(http.MethodGet, requestPath, nil)
r, err := http.NewRequestWithContext(globalCtx, http.MethodGet, requestPath, nil)
if err != nil {
return nil, err
}

View File

@@ -36,6 +36,7 @@ type workerOpt struct {
num int
maxThreads int
env PreparedEnv
requestOptions []RequestOption
watch []string
maxConsecutiveFailures int
extensionWorkers *extensionWorkers
@@ -160,6 +161,15 @@ func WithWorkerEnv(env map[string]string) WorkerOption {
}
}
// WithWorkerRequestOptions sets options for the main dummy request created for the worker
func WithWorkerRequestOptions(options ...RequestOption) WorkerOption {
return func(w *workerOpt) error {
w.requestOptions = append(w.requestOptions, options...)
return nil
}
}
// WithWorkerMaxThreads sets the max number of threads for this specific worker
func WithWorkerMaxThreads(num int) WorkerOption {
return func(w *workerOpt) error {

View File

@@ -110,8 +110,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) {
// Create a dummy request to set up the worker
fc, err := newDummyContext(
filepath.Base(worker.fileName),
WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
WithRequestPreparedEnv(worker.env),
worker.requestOptions...,
)
if err != nil {
panic(err)

View File

@@ -5,6 +5,7 @@ import "C"
import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
@@ -19,7 +20,7 @@ type worker struct {
fileName string
num int
maxThreads int
env PreparedEnv
requestOptions []RequestOption
requestChan chan contextHolder
threads []*phpThread
threadMutex sync.RWMutex
@@ -127,9 +128,9 @@ func newWorker(o workerOpt) (*worker, error) {
w := &worker{
name: o.name,
fileName: absFileName,
requestOptions: o.requestOptions,
num: o.num,
maxThreads: o.maxThreads,
env: o.env,
requestChan: make(chan contextHolder),
threads: make([]*phpThread, 0, o.num),
allowPathMatching: allowPathMatching,
@@ -138,6 +139,12 @@ func newWorker(o workerOpt) (*worker, error) {
onThreadShutdown: o.onThreadShutdown,
}
w.requestOptions = append(
w.requestOptions,
WithRequestDocumentRoot(filepath.Dir(o.fileName), false),
WithRequestPreparedEnv(o.env),
)
if o.extensionWorkers != nil {
o.extensionWorkers.internalWorker = w
}