Files
frankenphp/worker.go
Alliballibaba2 f592e0f47b refactor: decouple worker threads from non-worker threads (#1137)
* Decouple workers.

* Moves code to separate file.

* Cleans up the exponential backoff.

* Initial working implementation.

* Refactors php threads to take callbacks.

* Cleanup.

* Cleanup.

* Cleanup.

* Cleanup.

* Adjusts watcher logic.

* Adjusts the watcher logic.

* Fix opcache_reset race condition.

* Fixing merge conflicts and formatting.

* Prevents overlapping of TSRM reservation and script execution.

* Adjustments as suggested by @dunglas.

* Adds error assertions.

* Adds comments.

* Removes logs and explicitly compares to C.false.

* Resets check.

* Adds cast for safety.

* Fixes waitgroup overflow.

* Resolves waitgroup race condition on startup.

* Moves worker request logic to worker.go.

* Removes defer.

* Removes call from go to c.

* Fixes merge conflict.

* Adds fibers test back in.

* Refactors new thread loop approach.

* Removes redundant check.

* Adds compareAndSwap.

* Refactor: removes global waitgroups and uses a 'thread state' abstraction instead.

* Removes unnecessary method.

* Updates comment.

* Removes unnecessary booleans.

* test

* First state machine steps.

* Splits threads.

* Minimal working implementation with broken tests.

* Fixes tests.

* Refactoring.

* Fixes merge conflicts.

* Formatting

* C formatting.

* More cleanup.

* Allows for clean state transitions.

* Adds state tests.

* Adds support for thread transitioning.

* Fixes the testdata path.

* Formatting.

* Allows transitioning back to inactive state.

* Fixes go linting.

* Formatting.

* Removes duplication.

* Applies suggestions by @dunglas

* Removes redundant check.

* Locks the handler on restart.

* Removes unnecessary log.

* Changes Unpin() logic as suggested by @withinboredom

* Adds suggestions by @dunglas and resolves TODO.

* Makes restarts fully safe.

* Will make the initial startup fail even if the watcher is enabled (as is currently the case)

* Also adds compareAndSwap to the test.

* Adds comment.

* Prevents panic on initial watcher startup.
2024-12-17 11:28:51 +01:00

169 lines
3.8 KiB
Go

package frankenphp
// #include "frankenphp.h"
import "C"
import (
"fmt"
"github.com/dunglas/frankenphp/internal/fastabs"
"net/http"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/watcher"
)
// represents a worker script and can have many threads assigned to it
type worker struct {
fileName string
num int
env PreparedEnv
requestChan chan *http.Request
threads []*phpThread
threadMutex sync.RWMutex
}
var (
workers map[string]*worker
watcherIsEnabled bool
)
func initWorkers(opt []workerOpt) error {
workers = make(map[string]*worker, len(opt))
workersReady := sync.WaitGroup{}
directoriesToWatch := getDirectoriesToWatch(opt)
watcherIsEnabled = len(directoriesToWatch) > 0
for _, o := range opt {
worker, err := newWorker(o)
worker.threads = make([]*phpThread, 0, o.num)
workersReady.Add(o.num)
if err != nil {
return err
}
for i := 0; i < worker.num; i++ {
thread := getInactivePHPThread()
convertToWorkerThread(thread, worker)
go func() {
thread.state.waitFor(stateReady)
workersReady.Done()
}()
}
}
workersReady.Wait()
if !watcherIsEnabled {
return nil
}
if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
return err
}
return nil
}
func newWorker(o workerOpt) (*worker, error) {
absFileName, err := fastabs.FastAbs(o.fileName)
if err != nil {
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
}
if o.env == nil {
o.env = make(PreparedEnv, 1)
}
o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{
fileName: absFileName,
num: o.num,
env: o.env,
requestChan: make(chan *http.Request),
}
workers[absFileName] = w
return w, nil
}
func drainWorkers() {
watcher.DrainWatcher()
}
func restartWorkers() {
ready := sync.WaitGroup{}
threadsToRestart := make([]*phpThread, 0)
for _, worker := range workers {
worker.threadMutex.RLock()
ready.Add(len(worker.threads))
for _, thread := range worker.threads {
if !thread.state.requestSafeStateChange(stateRestarting) {
// no state change allowed = shutdown
continue
}
close(thread.drainChan)
threadsToRestart = append(threadsToRestart, thread)
go func(thread *phpThread) {
thread.state.waitFor(stateYielding)
ready.Done()
}(thread)
}
worker.threadMutex.RUnlock()
}
ready.Wait()
for _, thread := range threadsToRestart {
thread.drainChan = make(chan struct{})
thread.state.set(stateReady)
}
}
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
directoriesToWatch := []string{}
for _, w := range workerOpts {
directoriesToWatch = append(directoriesToWatch, w.watch...)
}
return directoriesToWatch
}
func (worker *worker) attachThread(thread *phpThread) {
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
worker.threadMutex.Unlock()
}
func (worker *worker) detachThread(thread *phpThread) {
worker.threadMutex.Lock()
for i, t := range worker.threads {
if t == thread {
worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
break
}
}
worker.threadMutex.Unlock()
}
func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
metrics.StartWorkerRequest(fc.scriptFilename)
// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
for _, thread := range worker.threads {
select {
case thread.requestChan <- r:
worker.threadMutex.RUnlock()
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
return
default:
}
}
worker.threadMutex.RUnlock()
// if no thread was available, fan the request out to all threads
// TODO: theoretically there could be autoscaling of threads here
worker.requestChan <- r
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
}