package frankenphp // #include "frankenphp.h" import "C" import ( "context" "log/slog" "path/filepath" "time" "unsafe" ) // representation of a thread assigned to a worker script // executes the PHP worker script in a loop // implements the threadHandler interface type workerThread struct { state *threadState thread *phpThread worker *worker dummyFrankenPHPContext *frankenPHPContext dummyContext context.Context workerFrankenPHPContext *frankenPHPContext workerContext context.Context backoff *exponentialBackoff isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet } func convertToWorkerThread(thread *phpThread, worker *worker) { thread.setHandler(&workerThread{ state: thread.state, thread: thread, worker: worker, backoff: &exponentialBackoff{ maxBackoff: 1 * time.Second, minBackoff: 100 * time.Millisecond, maxConsecutiveFailures: worker.maxConsecutiveFailures, }, }) worker.attachThread(thread) } // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *workerThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) return handler.thread.transitionToNewHandler() case stateRestarting: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.state.set(stateYielding) handler.state.waitFor(stateReady, stateShuttingDown) return handler.beforeScriptExecution() case stateReady, stateTransitionComplete: if handler.worker.onThreadReady != nil { handler.worker.onThreadReady(handler.thread.threadIndex) } setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) // signal to stop return "" } panic("unexpected state: " + handler.state.name()) } func (handler *workerThread) afterScriptExecution(exitStatus int) { tearDownWorkerScript(handler, exitStatus) } func (handler *workerThread) frankenPHPContext() *frankenPHPContext { if handler.workerFrankenPHPContext != nil { return handler.workerFrankenPHPContext } return handler.dummyFrankenPHPContext } func (handler *workerThread) context() context.Context { if handler.workerContext != nil { return handler.workerContext } return handler.dummyContext } func (handler *workerThread) name() string { return "Worker PHP Thread - " + handler.worker.fileName } func setupWorkerScript(handler *workerThread, worker *worker) { handler.backoff.wait() metrics.StartWorker(worker.name) if handler.state.is(stateReady) { metrics.ReadyWorker(handler.worker.name) } // Create a dummy request to set up the worker fc, err := newDummyContext( filepath.Base(worker.fileName), worker.requestOptions..., ) if err != nil { panic(err) } ctx := context.WithValue(globalCtx, contextKey, fc) fc.worker = worker handler.dummyFrankenPHPContext = fc handler.dummyContext = ctx handler.isBootingScript = true clearSandboxedEnv(handler.thread) if globalLogger.Enabled(ctx, slog.LevelDebug) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) } } func tearDownWorkerScript(handler *workerThread, exitStatus int) { worker := handler.worker handler.dummyFrankenPHPContext = nil handler.dummyContext = nil // if the worker request is not nil, the script might have crashed // make sure to close the worker request context if handler.workerFrankenPHPContext != nil { handler.workerFrankenPHPContext.closeContext() handler.workerFrankenPHPContext = nil handler.workerContext = nil } // on exit status 0 we just run the worker script again if exitStatus == 0 && !handler.isBootingScript { metrics.StopWorker(worker.name, StopReasonRestart) handler.backoff.recordSuccess() if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) } return } // worker has thrown a fatal error or has not reached frankenphp_handle_request metrics.StopWorker(worker.name, StopReasonCrash) if !handler.isBootingScript { // fatal error (could be due to exit(1), timeouts, etc.) if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) } return } if globalLogger.Enabled(globalCtx, slog.LevelError) { globalLogger.LogAttrs(globalCtx, slog.LevelError, "worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) } // panic after exponential backoff if the worker has never reached frankenphp_handle_request if handler.backoff.recordFailure() { if !watcherIsEnabled && !handler.state.is(stateReady) { panic("too many consecutive worker failures") } if globalLogger.Enabled(globalCtx, slog.LevelWarn) { globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.failureCount)) } } } // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script. func (handler *workerThread) waitForWorkerRequest() (bool, any) { // unpin any memory left over from previous requests handler.thread.Unpin() if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "waiting for request", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) } // Clear the first dummy request created to initialize the worker if handler.isBootingScript { handler.isBootingScript = false if !C.frankenphp_shutdown_dummy_request() { panic("Not in CGI context") } } // worker threads are 'ready' after they first reach frankenphp_handle_request() // 'stateTransitionComplete' is only true on the first boot of the worker script, // while 'isBootingScript' is true on every boot of the worker script if handler.state.is(stateTransitionComplete) { metrics.ReadyWorker(handler.worker.name) handler.state.set(stateReady) } handler.state.markAsWaiting(true) var requestCH contextHolder select { case <-handler.thread.drainChan: if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) } // flush the opcache when restarting due to watcher or admin api // note: this is done right before frankenphp_handle_request() returns 'false' if handler.state.is(stateRestarting) { C.frankenphp_reset_opcache() } return false, nil case requestCH = <-handler.thread.requestChan: case requestCH = <-handler.worker.requestChan: } handler.workerContext = requestCH.ctx handler.workerFrankenPHPContext = requestCH.frankenPHPContext handler.state.markAsWaiting(false) if globalLogger.Enabled(requestCH.ctx, slog.LevelDebug) { if handler.workerFrankenPHPContext.request == nil { globalLogger.LogAttrs(requestCH.ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) } else { globalLogger.LogAttrs(requestCH.ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", handler.workerFrankenPHPContext.request.RequestURI)) } } return true, handler.workerFrankenPHPContext.handlerParameters } // go_frankenphp_worker_handle_request_start is called at the start of every php request served. // //export go_frankenphp_worker_handle_request_start func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool, unsafe.Pointer) { handler := phpThreads[threadIndex].handler.(*workerThread) hasRequest, parameters := handler.waitForWorkerRequest() if parameters != nil { var ptr unsafe.Pointer switch p := parameters.(type) { case unsafe.Pointer: ptr = p default: ptr = PHPValue(p) } handler.thread.Pin(ptr) return C.bool(hasRequest), ptr } return C.bool(hasRequest), nil } // go_frankenphp_finish_worker_request is called at the end of every php request served. // //export go_frankenphp_finish_worker_request func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval) { thread := phpThreads[threadIndex] ctx := thread.context() fc := ctx.Value(contextKey).(*frankenPHPContext) if retval != nil { r, err := GoValue[any](unsafe.Pointer(retval)) if err != nil && globalLogger.Enabled(ctx, slog.LevelError) { globalLogger.LogAttrs(ctx, slog.LevelError, "cannot convert return value", slog.Any("error", err), slog.Int("thread", thread.threadIndex)) } fc.handlerReturn = r } fc.closeContext() thread.handler.(*workerThread).workerFrankenPHPContext = nil thread.handler.(*workerThread).workerContext = nil if globalLogger.Enabled(ctx, slog.LevelDebug) { if fc.request == nil { fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex)) } else { fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI)) } } } // when frankenphp_finish_request() is directly called from PHP // //export go_frankenphp_finish_php_request func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] fc := thread.frankenPHPContext() fc.closeContext() ctx := thread.context() if fc.logger.Enabled(ctx, slog.LevelDebug) { fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI)) } }