diff --git a/threadFramework.go b/threadFramework.go index 6a3315a6..ff6032d5 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -60,49 +60,34 @@ func RegisterExternalWorker(worker WorkerExtension) { // startExternalWorkerPipe creates a pipe from an external worker to the main worker. func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { - go func() { - defer func() { - if r := recover(); r != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r)) - } - }() + for { + rq := externalWorker.ProvideRequest() - for { - var rq *WorkerRequest - func() { - defer func() { - if r := recover(); r != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r)) - rq = nil - } + if rq == nil || rq.Request == nil { + logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) + continue + } + + r := rq.Request + fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) + if err != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) + continue + } + + if fc, ok := fromContext(fr.Context()); ok { + fc.responseWriter = rq.Response + + // Queue the request and wait for completion if Done channel was provided + logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name)) + + w.requestChan <- fc + if rq.Done != nil { + go func() { + <-fc.done + close(rq.Done) }() - rq = externalWorker.ProvideRequest() - }() - - if rq == nil || rq.Request == nil { - logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) - continue - } - - r := rq.Request - fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) - if err != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) - continue - } - - if fc, ok := fromContext(fr.Context()); ok { - fc.responseWriter = rq.Response - - // Queue the request and wait for completion if Done channel was provided - w.requestChan <- fc - if rq.Done != nil { - go func() { - <-fc.done - close(rq.Done) - }() - } } } - }() + } } diff --git a/worker.go b/worker.go index 39417e65..dcc07444 100644 --- a/worker.go +++ b/worker.go @@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error { // create a pipe from the external worker to the main worker // note: this is locked to the initial thread size the external worker requested if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil { - startExternalWorkerPipe(w, workerThread.externalWorker, thread) + go startExternalWorkerPipe(w, workerThread.externalWorker, thread) } workersReady.Done() }()