diff --git a/semaphore_admission.go b/semaphore-admission.go similarity index 58% rename from semaphore_admission.go rename to semaphore-admission.go index 65700e10..26e95f8e 100644 --- a/semaphore_admission.go +++ b/semaphore-admission.go @@ -7,6 +7,7 @@ import ( ) func acquireSemaphoreWithAdmissionControl( + ctx context.Context, sem *semaphore.Weighted, scaleChan chan *frankenPHPContext, fc *frankenPHPContext, @@ -16,8 +17,8 @@ func acquireSemaphoreWithAdmissionControl( } if maxWaitTime > 0 && scaleChan != nil { - ctx, cancel := context.WithTimeout(context.Background(), minStallTime) - err := sem.Acquire(ctx, 1) + ct, cancel := context.WithTimeout(ctx, minStallTime) + err := sem.Acquire(ct, 1) cancel() if err != nil { @@ -26,22 +27,30 @@ func acquireSemaphoreWithAdmissionControl( default: } - ctx, cancel := context.WithTimeout(context.Background(), maxWaitTime) + ctx, cancel := context.WithTimeout(ctx, maxWaitTime) defer cancel() if err := sem.Acquire(ctx, 1); err != nil { return ErrMaxWaitTimeExceeded } } - } else if maxWaitTime > 0 { - ctx, cancel := context.WithTimeout(context.Background(), maxWaitTime) + + return nil + } + + if maxWaitTime > 0 { + ctx, cancel := context.WithTimeout(ctx, maxWaitTime) defer cancel() if err := sem.Acquire(ctx, 1); err != nil { return ErrMaxWaitTimeExceeded } - } else if scaleChan != nil { - ctx, cancel := context.WithTimeout(context.Background(), minStallTime) + + return nil + } + + if scaleChan != nil { + ctx, cancel := context.WithTimeout(ctx, minStallTime) err := sem.Acquire(ctx, 1) cancel() @@ -51,14 +60,15 @@ func acquireSemaphoreWithAdmissionControl( default: } - if err := sem.Acquire(context.Background(), 1); err != nil { + if err := sem.Acquire(ctx, 1); err != nil { return ErrMaxWaitTimeExceeded } } - } else { - if err := sem.Acquire(context.Background(), 1); err != nil { - return ErrMaxWaitTimeExceeded - } + return nil + } + + if err := sem.Acquire(ctx, 1); err != nil { + return ErrMaxWaitTimeExceeded } return nil diff --git a/threadregular.go b/threadregular.go index 141e68a9..1580b47e 100644 --- a/threadregular.go +++ b/threadregular.go @@ -113,7 +113,7 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.StartRequest() metrics.QueuedRequest() - if err := acquireSemaphoreWithAdmissionControl(regularSemaphore, scaleChan, ch.frankenPHPContext); err != nil { + if err := acquireSemaphoreWithAdmissionControl(ch.ctx, regularSemaphore, scaleChan, ch.frankenPHPContext); err != nil { ch.frankenPHPContext.reject(err) metrics.StopRequest() return err diff --git a/worker.go b/worker.go index 13e655a6..1efdab76 100644 --- a/worker.go +++ b/worker.go @@ -258,7 +258,7 @@ func (worker *worker) handleRequest(ch contextHolder) error { workerScaleChan = nil } - if err := acquireSemaphoreWithAdmissionControl(worker.semaphore, workerScaleChan, ch.frankenPHPContext); err != nil { + if err := acquireSemaphoreWithAdmissionControl(ch.ctx, worker.semaphore, workerScaleChan, ch.frankenPHPContext); err != nil { metrics.DequeuedWorkerRequest(worker.name) ch.frankenPHPContext.reject(err) metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))