mirror of
https://github.com/dunglas/frankenphp.git
synced 2025-12-24 13:38:11 +08:00
This patch brings hot reloading capabilities to PHP apps: in development, the browser will automatically refresh the page when any source file changes! It's similar to HMR in JavaScript. It is built on top of [the watcher mechanism](https://frankenphp.dev/docs/config/#watching-for-file-changes) and of the [Mercure](https://frankenphp.dev/docs/mercure/) integration. Each time a watched file is modified, a Mercure update is sent, giving the ability to the client to reload the page, or part of the page (assets, images...). Here is an example implementation: ```caddyfile root ./public mercure { subscriber_jwt {env.MERCURE_SUBSCRIBER_JWT_KEY} anonymous } php_server { hot_reload } ``` ```php <?php header('Content-Type: text/html'); ?> <!DOCTYPE html> <html lang="en"> <head> <title>Test</title> <script> const es = new EventSource('<?=$_SERVER['FRANKENPHP_HOT_RELOAD']?>'); es.onmessage = () => location.reload(); </script> </head> <body> Hello ``` I plan to create a helper JS library to handle more advanced cases (reloading CSS, JS, etc), similar to [HotWire Spark](https://github.com/hotwired/spark). Be sure to attend my SymfonyCon to learn more! There is still room for improvement: - Provide an option to only trigger the update without reloading the worker for some files (ex, images, JS, CSS...) - Support classic mode (currently, only the worker mode is supported) - Don't reload all workers when only the files used by one change However, this PR is working as-is and can be merged as a first step. This patch heavily refactors the watcher module. Maybe it will be possible to extract it as a standalone library at some point (would be useful to add a similar feature but not tight to PHP as a Caddy module). --------- Signed-off-by: Kévin Dunglas <kevin@dunglas.fr> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
321 lines
7.4 KiB
Go
321 lines
7.4 KiB
Go
package frankenphp
|
|
|
|
// #include "frankenphp.h"
|
|
import "C"
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/dunglas/frankenphp/internal/fastabs"
|
|
"github.com/dunglas/frankenphp/internal/state"
|
|
)
|
|
|
|
// represents a worker script and can have many threads assigned to it
|
|
type worker struct {
|
|
mercureContext
|
|
|
|
name string
|
|
fileName string
|
|
num int
|
|
maxThreads int
|
|
requestOptions []RequestOption
|
|
requestChan chan contextHolder
|
|
threads []*phpThread
|
|
threadMutex sync.RWMutex
|
|
allowPathMatching bool
|
|
maxConsecutiveFailures int
|
|
onThreadReady func(int)
|
|
onThreadShutdown func(int)
|
|
queuedRequests atomic.Int32
|
|
}
|
|
|
|
var (
|
|
workers []*worker
|
|
watcherIsEnabled bool
|
|
startupFailChan chan error
|
|
)
|
|
|
|
func initWorkers(opt []workerOpt) error {
|
|
if len(opt) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
workersReady sync.WaitGroup
|
|
totalThreadsToStart int
|
|
)
|
|
|
|
workers = make([]*worker, 0, len(opt))
|
|
|
|
for _, o := range opt {
|
|
w, err := newWorker(o)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
totalThreadsToStart += w.num
|
|
workers = append(workers, w)
|
|
}
|
|
|
|
startupFailChan = make(chan error, totalThreadsToStart)
|
|
|
|
for _, w := range workers {
|
|
for i := 0; i < w.num; i++ {
|
|
thread := getInactivePHPThread()
|
|
convertToWorkerThread(thread, w)
|
|
|
|
workersReady.Go(func() {
|
|
thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done)
|
|
})
|
|
}
|
|
}
|
|
|
|
workersReady.Wait()
|
|
|
|
select {
|
|
case err := <-startupFailChan:
|
|
// at least 1 worker has failed, return an error
|
|
return fmt.Errorf("failed to initialize workers: %w", err)
|
|
default:
|
|
// all workers started successfully
|
|
startupFailChan = nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getWorkerByName(name string) *worker {
|
|
for _, w := range workers {
|
|
if w.name == name {
|
|
return w
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getWorkerByPath(path string) *worker {
|
|
for _, w := range workers {
|
|
if w.fileName == path && w.allowPathMatching {
|
|
return w
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func newWorker(o workerOpt) (*worker, error) {
|
|
absFileName, err := fastabs.FastAbs(o.fileName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
|
|
}
|
|
|
|
if _, err := os.Stat(absFileName); err != nil {
|
|
return nil, fmt.Errorf("worker file not found %q: %w", absFileName, err)
|
|
}
|
|
|
|
if o.name == "" {
|
|
o.name = absFileName
|
|
}
|
|
|
|
// workers that have a name starting with "m#" are module workers
|
|
// they can only be matched by their name, not by their path
|
|
allowPathMatching := !strings.HasPrefix(o.name, "m#")
|
|
|
|
if w := getWorkerByPath(absFileName); w != nil && allowPathMatching {
|
|
return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName)
|
|
}
|
|
if w := getWorkerByName(o.name); w != nil {
|
|
return w, fmt.Errorf("two workers cannot have the same name: %q", o.name)
|
|
}
|
|
|
|
if o.env == nil {
|
|
o.env = make(PreparedEnv, 1)
|
|
}
|
|
|
|
o.env["FRANKENPHP_WORKER\x00"] = "1"
|
|
w := &worker{
|
|
name: o.name,
|
|
fileName: absFileName,
|
|
requestOptions: o.requestOptions,
|
|
num: o.num,
|
|
maxThreads: o.maxThreads,
|
|
requestChan: make(chan contextHolder),
|
|
threads: make([]*phpThread, 0, o.num),
|
|
allowPathMatching: allowPathMatching,
|
|
maxConsecutiveFailures: o.maxConsecutiveFailures,
|
|
onThreadReady: o.onThreadReady,
|
|
onThreadShutdown: o.onThreadShutdown,
|
|
}
|
|
|
|
w.configureMercure(&o)
|
|
|
|
w.requestOptions = append(
|
|
w.requestOptions,
|
|
WithRequestDocumentRoot(filepath.Dir(o.fileName), false),
|
|
WithRequestPreparedEnv(o.env),
|
|
)
|
|
|
|
if o.extensionWorkers != nil {
|
|
o.extensionWorkers.internalWorker = w
|
|
}
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
|
|
func DrainWorkers() {
|
|
_ = drainWorkerThreads()
|
|
}
|
|
|
|
func drainWorkerThreads() []*phpThread {
|
|
var (
|
|
ready sync.WaitGroup
|
|
drainedThreads []*phpThread
|
|
)
|
|
|
|
for _, worker := range workers {
|
|
worker.threadMutex.RLock()
|
|
ready.Add(len(worker.threads))
|
|
|
|
for _, thread := range worker.threads {
|
|
if !thread.state.RequestSafeStateChange(state.Restarting) {
|
|
ready.Done()
|
|
|
|
// no state change allowed == thread is shutting down
|
|
// we'll proceed to restart all other threads anyway
|
|
continue
|
|
}
|
|
|
|
close(thread.drainChan)
|
|
drainedThreads = append(drainedThreads, thread)
|
|
|
|
go func(thread *phpThread) {
|
|
thread.state.WaitFor(state.Yielding)
|
|
ready.Done()
|
|
}(thread)
|
|
}
|
|
|
|
worker.threadMutex.RUnlock()
|
|
}
|
|
|
|
ready.Wait()
|
|
|
|
return drainedThreads
|
|
}
|
|
|
|
// RestartWorkers attempts to restart all workers gracefully
|
|
// All workers must be restarted at the same time to prevent issues with opcache resetting.
|
|
func RestartWorkers() {
|
|
// disallow scaling threads while restarting workers
|
|
scalingMu.Lock()
|
|
defer scalingMu.Unlock()
|
|
|
|
threadsToRestart := drainWorkerThreads()
|
|
|
|
for _, thread := range threadsToRestart {
|
|
thread.drainChan = make(chan struct{})
|
|
thread.state.Set(state.Ready)
|
|
}
|
|
}
|
|
|
|
func (worker *worker) attachThread(thread *phpThread) {
|
|
worker.threadMutex.Lock()
|
|
worker.threads = append(worker.threads, thread)
|
|
worker.threadMutex.Unlock()
|
|
}
|
|
|
|
func (worker *worker) detachThread(thread *phpThread) {
|
|
worker.threadMutex.Lock()
|
|
for i, t := range worker.threads {
|
|
if t == thread {
|
|
worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
worker.threadMutex.Unlock()
|
|
}
|
|
|
|
func (worker *worker) countThreads() int {
|
|
worker.threadMutex.RLock()
|
|
l := len(worker.threads)
|
|
worker.threadMutex.RUnlock()
|
|
|
|
return l
|
|
}
|
|
|
|
// check if max_threads has been reached
|
|
func (worker *worker) isAtThreadLimit() bool {
|
|
if worker.maxThreads <= 0 {
|
|
return false
|
|
}
|
|
|
|
worker.threadMutex.RLock()
|
|
atMaxThreads := len(worker.threads) >= worker.maxThreads
|
|
worker.threadMutex.RUnlock()
|
|
|
|
return atMaxThreads
|
|
}
|
|
|
|
func (worker *worker) handleRequest(ch contextHolder) error {
|
|
metrics.StartWorkerRequest(worker.name)
|
|
|
|
runtime.Gosched()
|
|
|
|
if worker.queuedRequests.Load() == 0 {
|
|
// dispatch requests to all worker threads in order
|
|
worker.threadMutex.RLock()
|
|
for _, thread := range worker.threads {
|
|
select {
|
|
case thread.requestChan <- ch:
|
|
worker.threadMutex.RUnlock()
|
|
<-ch.frankenPHPContext.done
|
|
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
|
|
|
|
return nil
|
|
default:
|
|
// thread is busy, continue
|
|
}
|
|
}
|
|
worker.threadMutex.RUnlock()
|
|
}
|
|
|
|
// if no thread was available, mark the request as queued and apply the scaling strategy
|
|
worker.queuedRequests.Add(1)
|
|
metrics.QueuedWorkerRequest(worker.name)
|
|
|
|
for {
|
|
workerScaleChan := scaleChan
|
|
if worker.isAtThreadLimit() {
|
|
workerScaleChan = nil // max_threads for this worker reached, do not attempt scaling
|
|
}
|
|
|
|
select {
|
|
case worker.requestChan <- ch:
|
|
worker.queuedRequests.Add(-1)
|
|
metrics.DequeuedWorkerRequest(worker.name)
|
|
<-ch.frankenPHPContext.done
|
|
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
|
|
|
|
return nil
|
|
case workerScaleChan <- ch.frankenPHPContext:
|
|
// the request has triggered scaling, continue to wait for a thread
|
|
case <-timeoutChan(maxWaitTime):
|
|
// the request has timed out stalling
|
|
worker.queuedRequests.Add(-1)
|
|
metrics.DequeuedWorkerRequest(worker.name)
|
|
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
|
|
|
|
ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)
|
|
|
|
return ErrMaxWaitTimeExceeded
|
|
}
|
|
}
|
|
}
|