Files
frankenphp/threadFramework.go
Rob Landers 52df300f86 feat: custom workers initial support (#1795)
* create a simple thread framework

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add tests

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix comment

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* remove mention of an old function that no longer exists

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* simplify providing a request

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* satisfy linter

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add error handling and handle shutdowns

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add tests

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* pipes are tied to workers, not threads

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix test

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add a way to detect when a request is completed

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* we never shutdown workers or remove them, so we do not need this

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* add more comments

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* Simplify modular threads (#1874)

* Simplify

* remove unused variable

* log thread index

* feat: allow passing parameters to the PHP callback and accessing its return value (#1881)

* fix formatting

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix test compilation

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* fix segfaults

Signed-off-by: Robert Landers <landers.robert@gmail.com>

* Update frankenphp.c

Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>

---------

Signed-off-by: Robert Landers <landers.robert@gmail.com>
Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
2025-09-18 09:21:49 +02:00

102 lines
4.3 KiB
Go

package frankenphp
import (
"context"
"log/slog"
"net/http"
"sync"
)
// EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
// frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard
// http.Request that will be conferred to the underlying worker script.
//
// A worker script with the provided Name and FileName will be registered, along with the provided
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
// These methods are only called once at startup, so register them in an init() function.
//
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
//
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type WorkerExtension interface {
Name() string
FileName() string
Env() PreparedEnv
GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
ProvideRequest() *WorkerRequest[any, any]
}
// EXPERIMENTAL
type WorkerRequest[P any, R any] struct {
// The request for your worker script to handle
Request *http.Request
// Response is a response writer that provides the output of the provided request, it must not be nil to access the request body
Response http.ResponseWriter
// CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback
CallbackParameters P
// AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter
AfterFunc func(callbackReturn R)
}
var externalWorkers = make(map[string]WorkerExtension)
var externalWorkerMutex sync.Mutex
// EXPERIMENTAL
func RegisterExternalWorker(worker WorkerExtension) {
externalWorkerMutex.Lock()
defer externalWorkerMutex.Unlock()
externalWorkers[worker.Name()] = worker
}
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
for {
rq := externalWorker.ProvideRequest()
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
fc.handlerParameters = rq.CallbackParameters
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
w.requestChan <- fc
if rq.AfterFunc != nil {
go func() {
<-fc.done
if rq.AfterFunc != nil {
rq.AfterFunc(fc.handlerReturn)
}
}()
}
}
}
}