mirror of
https://github.com/dunglas/frankenphp.git
synced 2025-09-26 19:41:13 +08:00
261 lines
6.2 KiB
Go
261 lines
6.2 KiB
Go
package frankenphp
|
|
|
|
// #include "frankenphp.h"
|
|
import "C"
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/dunglas/frankenphp/internal/fastabs"
|
|
"github.com/dunglas/frankenphp/internal/watcher"
|
|
)
|
|
|
|
// represents a worker script and can have many threads assigned to it
|
|
type worker struct {
|
|
name string
|
|
fileName string
|
|
num int
|
|
env PreparedEnv
|
|
requestChan chan *frankenPHPContext
|
|
threads []*phpThread
|
|
threadMutex sync.RWMutex
|
|
allowPathMatching bool
|
|
maxConsecutiveFailures int
|
|
}
|
|
|
|
var (
|
|
workers []*worker
|
|
watcherIsEnabled bool
|
|
)
|
|
|
|
func initWorkers(opt []workerOpt) error {
|
|
workers = make([]*worker, 0, len(opt))
|
|
workersReady := sync.WaitGroup{}
|
|
directoriesToWatch := getDirectoriesToWatch(opt)
|
|
watcherIsEnabled = len(directoriesToWatch) > 0
|
|
|
|
for _, o := range opt {
|
|
w, err := newWorker(o)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
workers = append(workers, w)
|
|
}
|
|
|
|
for _, w := range workers {
|
|
workersReady.Add(w.num)
|
|
for i := 0; i < w.num; i++ {
|
|
thread := getInactivePHPThread()
|
|
convertToWorkerThread(thread, w)
|
|
go func() {
|
|
thread.state.waitFor(stateReady)
|
|
|
|
// create a pipe from the external worker to the main worker
|
|
// note: this is locked to the initial thread size the external worker requested
|
|
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
|
|
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
|
|
}
|
|
workersReady.Done()
|
|
}()
|
|
}
|
|
}
|
|
|
|
workersReady.Wait()
|
|
|
|
if !watcherIsEnabled {
|
|
return nil
|
|
}
|
|
|
|
watcherIsEnabled = true
|
|
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getWorkerByName(name string) *worker {
|
|
for _, w := range workers {
|
|
if w.name == name {
|
|
return w
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getWorkerByPath(path string) *worker {
|
|
for _, w := range workers {
|
|
if w.fileName == path && w.allowPathMatching {
|
|
return w
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func newWorker(o workerOpt) (*worker, error) {
|
|
absFileName, err := fastabs.FastAbs(o.fileName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
|
|
}
|
|
|
|
if o.name == "" {
|
|
o.name = absFileName
|
|
}
|
|
|
|
// workers that have a name starting with "m#" are module workers
|
|
// they can only be matched by their name, not by their path
|
|
allowPathMatching := !strings.HasPrefix(o.name, "m#")
|
|
|
|
if w := getWorkerByPath(absFileName); w != nil && allowPathMatching {
|
|
return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName)
|
|
}
|
|
if w := getWorkerByName(o.name); w != nil {
|
|
return w, fmt.Errorf("two workers cannot have the same name: %q", o.name)
|
|
}
|
|
|
|
if o.env == nil {
|
|
o.env = make(PreparedEnv, 1)
|
|
}
|
|
|
|
o.env["FRANKENPHP_WORKER\x00"] = "1"
|
|
w := &worker{
|
|
name: o.name,
|
|
fileName: absFileName,
|
|
num: o.num,
|
|
env: o.env,
|
|
requestChan: make(chan *frankenPHPContext),
|
|
threads: make([]*phpThread, 0, o.num),
|
|
allowPathMatching: allowPathMatching,
|
|
maxConsecutiveFailures: o.maxConsecutiveFailures,
|
|
}
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
|
|
func DrainWorkers() {
|
|
_ = drainWorkerThreads()
|
|
}
|
|
|
|
func drainWorkerThreads() []*phpThread {
|
|
ready := sync.WaitGroup{}
|
|
drainedThreads := make([]*phpThread, 0)
|
|
for _, worker := range workers {
|
|
worker.threadMutex.RLock()
|
|
ready.Add(len(worker.threads))
|
|
for _, thread := range worker.threads {
|
|
if !thread.state.requestSafeStateChange(stateRestarting) {
|
|
ready.Done()
|
|
// no state change allowed == thread is shutting down
|
|
// we'll proceed to restart all other threads anyways
|
|
continue
|
|
}
|
|
close(thread.drainChan)
|
|
drainedThreads = append(drainedThreads, thread)
|
|
go func(thread *phpThread) {
|
|
thread.state.waitFor(stateYielding)
|
|
ready.Done()
|
|
}(thread)
|
|
}
|
|
worker.threadMutex.RUnlock()
|
|
}
|
|
ready.Wait()
|
|
|
|
return drainedThreads
|
|
}
|
|
|
|
func drainWatcher() {
|
|
if watcherIsEnabled {
|
|
watcher.DrainWatcher()
|
|
}
|
|
}
|
|
|
|
// RestartWorkers attempts to restart all workers gracefully
|
|
func RestartWorkers() {
|
|
// disallow scaling threads while restarting workers
|
|
scalingMu.Lock()
|
|
defer scalingMu.Unlock()
|
|
|
|
threadsToRestart := drainWorkerThreads()
|
|
|
|
for _, thread := range threadsToRestart {
|
|
thread.drainChan = make(chan struct{})
|
|
thread.state.set(stateReady)
|
|
}
|
|
}
|
|
|
|
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
|
|
directoriesToWatch := []string{}
|
|
for _, w := range workerOpts {
|
|
directoriesToWatch = append(directoriesToWatch, w.watch...)
|
|
}
|
|
return directoriesToWatch
|
|
}
|
|
|
|
func (worker *worker) attachThread(thread *phpThread) {
|
|
worker.threadMutex.Lock()
|
|
worker.threads = append(worker.threads, thread)
|
|
worker.threadMutex.Unlock()
|
|
}
|
|
|
|
func (worker *worker) detachThread(thread *phpThread) {
|
|
worker.threadMutex.Lock()
|
|
for i, t := range worker.threads {
|
|
if t == thread {
|
|
worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
worker.threadMutex.Unlock()
|
|
}
|
|
|
|
func (worker *worker) countThreads() int {
|
|
worker.threadMutex.RLock()
|
|
l := len(worker.threads)
|
|
worker.threadMutex.RUnlock()
|
|
|
|
return l
|
|
}
|
|
|
|
func (worker *worker) handleRequest(fc *frankenPHPContext) {
|
|
metrics.StartWorkerRequest(worker.name)
|
|
|
|
// dispatch requests to all worker threads in order
|
|
worker.threadMutex.RLock()
|
|
for _, thread := range worker.threads {
|
|
select {
|
|
case thread.requestChan <- fc:
|
|
worker.threadMutex.RUnlock()
|
|
<-fc.done
|
|
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
|
|
return
|
|
default:
|
|
// thread is busy, continue
|
|
}
|
|
}
|
|
worker.threadMutex.RUnlock()
|
|
|
|
// if no thread was available, mark the request as queued and apply the scaling strategy
|
|
metrics.QueuedWorkerRequest(worker.name)
|
|
for {
|
|
select {
|
|
case worker.requestChan <- fc:
|
|
metrics.DequeuedWorkerRequest(worker.name)
|
|
<-fc.done
|
|
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
|
|
return
|
|
case scaleChan <- fc:
|
|
// the request has triggered scaling, continue to wait for a thread
|
|
case <-timeoutChan(maxWaitTime):
|
|
metrics.DequeuedWorkerRequest(worker.name)
|
|
// the request has timed out stalling
|
|
fc.reject(504, "Gateway Timeout")
|
|
return
|
|
}
|
|
}
|
|
}
|