This commit is contained in:
Kévin Dunglas
2025-09-13 13:57:37 +02:00
parent 0d1daf6685
commit 26a84c5700
2 changed files with 27 additions and 42 deletions

View File

@@ -60,49 +60,34 @@ func RegisterExternalWorker(worker WorkerExtension) {
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
go func() {
defer func() {
if r := recover(); r != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r))
}
}()
for {
rq := externalWorker.ProvideRequest()
for {
var rq *WorkerRequest
func() {
defer func() {
if r := recover(); r != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r))
rq = nil
}
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name))
w.requestChan <- fc
if rq.Done != nil {
go func() {
<-fc.done
close(rq.Done)
}()
rq = externalWorker.ProvideRequest()
}()
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
// Queue the request and wait for completion if Done channel was provided
w.requestChan <- fc
if rq.Done != nil {
go func() {
<-fc.done
close(rq.Done)
}()
}
}
}
}()
}
}

View File

@@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error {
// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
startExternalWorkerPipe(w, workerThread.externalWorker, thread)
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()