diff --git a/caddy/app.go b/caddy/app.go index ad648efd..02dcf170 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/caddyserver/caddy/v2" @@ -18,6 +19,22 @@ import ( "github.com/dunglas/frankenphp/internal/fastabs" ) +var ( + options []frankenphp.Option + optionsMU sync.RWMutex +) + +// EXPERIMENTAL: RegisterWorkers provides a way for extensions to register frankenphp.Workers +func RegisterWorkers(name, fileName string, num int, wo ...frankenphp.WorkerOption) frankenphp.Workers { + w, opt := frankenphp.WithExtensionWorkers(name, fileName, num, wo...) + + optionsMU.Lock() + options = append(options, opt) + optionsMU.Unlock() + + return w +} + // FrankenPHPApp represents the global "frankenphp" directive in the Caddyfile // it's responsible for starting up the global PHP instance and all threads // @@ -118,6 +135,11 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), } + + optionsMU.RLock() + opts = append(opts, options...) + optionsMU.RUnlock() + for _, w := range append(f.Workers) { workerOpts := []frankenphp.WorkerOption{ frankenphp.WithWorkerEnv(w.Env), @@ -151,6 +173,10 @@ func (f *FrankenPHPApp) Stop() error { f.NumThreads = 0 f.MaxWaitTime = 0 + optionsMU.Lock() + options = nil + optionsMU.Unlock() + return nil } diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index bbe192a3..7ca641fd 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -965,7 +965,7 @@ func TestMaxWaitTime(t *testing.T) { for range 10 { go func() { statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10", t) - if statusCode == http.StatusGatewayTimeout { + if statusCode == http.StatusServiceUnavailable { success.Store(true) } wg.Done() @@ -973,7 +973,7 @@ func TestMaxWaitTime(t *testing.T) { } wg.Wait() - require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") + require.True(t, success.Load(), "At least one request should have failed with a 503 Service Unavailable status") } func TestMaxWaitTimeWorker(t *testing.T) { @@ -1012,23 +1012,26 @@ func TestMaxWaitTimeWorker(t *testing.T) { for range 10 { go func() { statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10&iteration=1", t) - if statusCode == http.StatusGatewayTimeout { + if statusCode == http.StatusServiceUnavailable { success.Store(true) } wg.Done() }() } wg.Wait() - require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") + require.True(t, success.Load(), "At least one request should have failed with a 503 Service Unavailable status") // Fetch metrics resp, err := http.Get("http://localhost:2999/metrics") require.NoError(t, err, "failed to fetch metrics") - defer resp.Body.Close() + t.Cleanup(func() { + require.NoError(t, resp.Body.Close()) + }) // Read and parse metrics metrics := new(bytes.Buffer) _, err = metrics.ReadFrom(resp.Body) + require.NoError(t, err) expectedMetrics := ` # TYPE frankenphp_worker_queue_depth gauge diff --git a/caddy/module.go b/caddy/module.go index 8e6ce80e..e4b2f9e8 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -2,6 +2,7 @@ package caddy import ( "encoding/json" + "errors" "fmt" "log/slog" "net/http" @@ -192,8 +193,11 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c frankenphp.WithOriginalRequest(&origReq), frankenphp.WithWorkerName(workerName), ) + if err != nil { + return caddyhttp.Error(http.StatusInternalServerError, err) + } - if err = frankenphp.ServeHTTP(w, fr); err != nil { + if err = frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, &frankenphp.ErrRejected{}) { return caddyhttp.Error(http.StatusInternalServerError, err) } diff --git a/context.go b/context.go index b039feba..543bb4a7 100644 --- a/context.go +++ b/context.go @@ -2,6 +2,8 @@ package frankenphp import ( "context" + "errors" + "fmt" "log/slog" "net/http" "os" @@ -117,23 +119,25 @@ func (fc *frankenPHPContext) closeContext() { } // validate checks if the request should be outright rejected -func (fc *frankenPHPContext) validate() bool { +func (fc *frankenPHPContext) validate() error { if strings.Contains(fc.request.URL.Path, "\x00") { - fc.rejectBadRequest("Invalid request path") + fc.reject(ErrInvalidRequestPath) - return false + return ErrInvalidRequestPath } contentLengthStr := fc.request.Header.Get("Content-Length") if contentLengthStr != "" { if contentLength, err := strconv.Atoi(contentLengthStr); err != nil || contentLength < 0 { - fc.rejectBadRequest("invalid Content-Length header: " + contentLengthStr) + e := fmt.Errorf("%w: %q", ErrInvalidContentLengthHeader, contentLengthStr) - return false + fc.reject(e) + + return e } } - return true + return nil } func (fc *frankenPHPContext) clientHasClosed() bool { @@ -149,16 +153,22 @@ func (fc *frankenPHPContext) clientHasClosed() bool { } } -// reject sends a response with the given status code and message -func (fc *frankenPHPContext) reject(statusCode int, message string) { +// reject sends a response with the given status code and error +func (fc *frankenPHPContext) reject(err error) { if fc.isDone { return } + re := &ErrRejected{} + if !errors.As(err, re) { + // Should never happen + panic("only instance of ErrRejected can be passed to reject") + } + rw := fc.responseWriter if rw != nil { - rw.WriteHeader(statusCode) - _, _ = rw.Write([]byte(message)) + rw.WriteHeader(re.status) + _, _ = rw.Write([]byte(err.Error())) if f, ok := rw.(http.Flusher); ok { f.Flush() @@ -167,7 +177,3 @@ func (fc *frankenPHPContext) reject(statusCode int, message string) { fc.closeContext() } - -func (fc *frankenPHPContext) rejectBadRequest(message string) { - fc.reject(http.StatusBadRequest, message) -} diff --git a/frankenphp.go b/frankenphp.go index ae11ec10..8d3263b8 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -52,6 +52,10 @@ var ( ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") + ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest} + ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest} + ErrMaxWaitTimeExceeded = ErrRejected{"maximum request handling time exceeded", http.StatusServiceUnavailable} + isRunning bool onServerShutdown []func() @@ -63,34 +67,43 @@ var ( maxWaitTime time.Duration ) +type ErrRejected struct { + message string + status int +} + +func (e ErrRejected) Error() string { + return e.message +} + type syslogLevel int const ( - emerg syslogLevel = iota // system is unusable - alert // action must be taken immediately - crit // critical conditions - err // error conditions - warning // warning conditions - notice // normal but significant condition - info // informational - debug // debug-level messages + syslogLevelEmerg syslogLevel = iota // system is unusable + syslogLevelAlert // action must be taken immediately + syslogLevelCrit // critical conditions + syslogLevelErr // error conditions + syslogLevelWarn // warning conditions + syslogLevelNotice // normal but significant condition + syslogLevelInfo // informational + syslogLevelDebug // debug-level messages ) func (l syslogLevel) String() string { switch l { - case emerg: + case syslogLevelEmerg: return "emerg" - case alert: + case syslogLevelAlert: return "alert" - case crit: + case syslogLevelCrit: return "crit" - case err: + case syslogLevelErr: return "err" - case warning: + case syslogLevelWarn: return "warning" - case notice: + case syslogLevelNotice: return "notice" - case debug: + case syslogLevelDebug: return "debug" default: return "info" @@ -210,11 +223,6 @@ func Init(options ...Option) error { registerExtensions() - // add registered external workers - for _, ew := range extensionWorkers { - options = append(options, WithWorkers(ew.name, ew.fileName, ew.num, ew.options...)) - } - opt := &opt{} for _, o := range options { if err := o(opt); err != nil { @@ -336,20 +344,17 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error fc.responseWriter = responseWriter - if !fc.validate() { - return nil + if err := fc.validate(); err != nil { + return err } // Detect if a worker is available to handle this request if fc.worker != nil { - fc.worker.handleRequest(fc) - - return nil + return fc.worker.handleRequest(fc) } // If no worker was available, send the request to non-worker threads - handleRequestWithRegularPHPThreads(fc) - return nil + return handleRequestWithRegularPHPThreads(fc) } //export go_ub_write @@ -566,19 +571,19 @@ func go_log(message *C.char, level C.int) { m := C.GoString(message) var le syslogLevel - if level < C.int(emerg) || level > C.int(debug) { - le = info + if level < C.int(syslogLevelEmerg) || level > C.int(syslogLevelDebug) { + le = syslogLevelInfo } else { le = syslogLevel(level) } switch le { - case emerg, alert, crit, err: + case syslogLevelEmerg, syslogLevelAlert, syslogLevelCrit, syslogLevelErr: logger.LogAttrs(context.Background(), slog.LevelError, m, slog.String("syslog_level", syslogLevel(level).String())) - case warning: + case syslogLevelWarn: logger.LogAttrs(context.Background(), slog.LevelWarn, m, slog.String("syslog_level", syslogLevel(level).String())) - case debug: + case syslogLevelDebug: logger.LogAttrs(context.Background(), slog.LevelDebug, m, slog.String("syslog_level", syslogLevel(level).String())) default: diff --git a/frankenphp_test.go b/frankenphp_test.go index ee13d196..5450da6c 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -78,7 +78,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * } err := frankenphp.Init(initOpts...) - require.Nil(t, err) + require.NoError(t, err) defer frankenphp.Shutdown() handler := func(w http.ResponseWriter, r *http.Request) { @@ -86,7 +86,9 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * assert.NoError(t, err) err = frankenphp.ServeHTTP(w, req) - assert.NoError(t, err) + if err != nil && !errors.As(err, &frankenphp.ErrRejected{}) { + assert.Fail(t, fmt.Sprintf("Received unexpected error:\n%+v", err)) + } } var ts *httptest.Server @@ -109,6 +111,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * func testRequest(req *http.Request, handler func(http.ResponseWriter, *http.Request), t *testing.T) (string, *http.Response) { t.Helper() + w := httptest.NewRecorder() handler(w, req) resp := w.Result() @@ -988,7 +991,7 @@ func FuzzRequest(f *testing.F) { // The response status must be 400 if the request path contains null bytes if strings.Contains(req.URL.Path, "\x00") { assert.Equal(t, 400, resp.StatusCode) - assert.Contains(t, body, "Invalid request path") + assert.Contains(t, body, "invalid request path") return } diff --git a/options.go b/options.go index befe3a7f..9d58125c 100644 --- a/options.go +++ b/options.go @@ -35,6 +35,7 @@ type workerOpt struct { env PreparedEnv watch []string maxConsecutiveFailures int + extensionWorkers *extensionWorkers onThreadReady func(int) onThreadShutdown func(int) onServerStartup func() @@ -67,7 +68,7 @@ func WithMetrics(m Metrics) Option { } // WithWorkers configures the PHP workers to start -func WithWorkers(name string, fileName string, num int, options ...WorkerOption) Option { +func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option { return func(o *opt) error { worker := workerOpt{ name: name, @@ -90,6 +91,54 @@ func WithWorkers(name string, fileName string, num int, options ...WorkerOption) } } +// EXPERIMENTAL: WithExtensionWorkers allow extensions to create workers. +// +// A worker script with the provided name, fileName and thread count will be registered, along with additional +// configuration through WorkerOptions. +// +// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. +// +// Extension workers receive the lowest priority when determining thread allocations. If the requested number of threads +// cannot be allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate +// more total threads). Don't be greedy. +func WithExtensionWorkers(name, fileName string, numThreads int, options ...WorkerOption) (Workers, Option) { + w := &extensionWorkers{ + name: name, + fileName: fileName, + num: numThreads, + } + + w.options = append(options, withExtensionWorkers(w)) + + return w, WithWorkers(w.name, w.fileName, w.num, w.options...) +} + +// WithLogger configures the global logger to use. +func WithLogger(l *slog.Logger) Option { + return func(o *opt) error { + o.logger = l + + return nil + } +} + +// WithPhpIni configures user defined PHP ini settings. +func WithPhpIni(overrides map[string]string) Option { + return func(o *opt) error { + o.phpIni = overrides + return nil + } +} + +// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread. +func WithMaxWaitTime(maxWaitTime time.Duration) Option { + return func(o *opt) error { + o.maxWaitTime = maxWaitTime + + return nil + } +} + // WithWorkerEnv sets environment variables for the worker func WithWorkerEnv(env map[string]string) WorkerOption { return func(w *workerOpt) error { @@ -154,27 +203,9 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption { } } -// WithLogger configures the global logger to use. -func WithLogger(l *slog.Logger) Option { - return func(o *opt) error { - o.logger = l - - return nil - } -} - -// WithPhpIni configures user defined PHP ini settings. -func WithPhpIni(overrides map[string]string) Option { - return func(o *opt) error { - o.phpIni = overrides - return nil - } -} - -// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread. -func WithMaxWaitTime(maxWaitTime time.Duration) Option { - return func(o *opt) error { - o.maxWaitTime = maxWaitTime +func withExtensionWorkers(w *extensionWorkers) WorkerOption { + return func(wo *workerOpt) error { + wo.extensionWorkers = w return nil } diff --git a/threadregular.go b/threadregular.go index 88cef7e7..64accc60 100644 --- a/threadregular.go +++ b/threadregular.go @@ -84,14 +84,15 @@ func (handler *regularThread) afterRequest() { handler.requestContext = nil } -func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { +func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) error { metrics.StartRequest() select { case regularRequestChan <- fc: // a thread was available to handle the request immediately <-fc.done metrics.StopRequest() - return + + return nil default: // no thread was available } @@ -104,14 +105,17 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.DequeuedRequest() <-fc.done metrics.StopRequest() - return + + return nil case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling metrics.DequeuedRequest() - fc.reject(504, "Gateway Timeout") - return + + fc.reject(ErrMaxWaitTimeExceeded) + + return ErrMaxWaitTimeExceeded } } } diff --git a/worker.go b/worker.go index 0f9aa087..cd592f6e 100644 --- a/worker.go +++ b/worker.go @@ -136,6 +136,10 @@ func newWorker(o workerOpt) (*worker, error) { onThreadShutdown: o.onThreadShutdown, } + if o.extensionWorkers != nil { + o.extensionWorkers.internalWorker = w + } + return w, nil } @@ -224,7 +228,7 @@ func (worker *worker) countThreads() int { return l } -func (worker *worker) handleRequest(fc *frankenPHPContext) { +func (worker *worker) handleRequest(fc *frankenPHPContext) error { metrics.StartWorkerRequest(worker.name) // dispatch requests to all worker threads in order @@ -235,7 +239,8 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { worker.threadMutex.RUnlock() <-fc.done metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) - return + + return nil default: // thread is busy, continue } @@ -250,14 +255,17 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.DequeuedWorkerRequest(worker.name) <-fc.done metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) - return + + return nil case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): - metrics.DequeuedWorkerRequest(worker.name) // the request has timed out stalling - fc.reject(504, "Gateway Timeout") - return + metrics.DequeuedWorkerRequest(worker.name) + + fc.reject(ErrMaxWaitTimeExceeded) + + return ErrMaxWaitTimeExceeded } } } diff --git a/workerextension.go b/workerextension.go index 743cc7f1..49334685 100644 --- a/workerextension.go +++ b/workerextension.go @@ -1,56 +1,30 @@ package frankenphp import ( - "errors" "net/http" ) -// EXPERIMENTAL: Worker allows you to register a worker where, instead of calling FrankenPHP handlers on -// frankenphp_handle_request(), the GetRequest method is called. -// -// You may provide an http.Request that will be conferred to the underlying worker script, -// or custom parameters that will be passed to frankenphp_handle_request(). -// -// After the execution of frankenphp_handle_request(), the return value WorkerRequest.AfterFunc will be called, -// with the optional return value of the callback passed as parameter. -// -// A worker script with the provided name, fileName and thread count will be registered, along with additional -// configuration through WorkerOptions. -// -// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. -// -// Extension workers receive the lowest priority when determining thread allocations. If MinThreads cannot be -// allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more -// total threads). Don't be greedy. -type Worker struct { - name string - fileName string - num int - options []WorkerOption +// EXPERIMENTAL: Workers allows you to register a worker. +type Workers interface { + // SendRequest calls the closure passed to frankenphp_handle_request() and updates the PHP context . + // The generated HTTP response will be written through the provided writer. + SendRequest(rw http.ResponseWriter, r *http.Request) error + // SendMessage calls the closure passed to frankenphp_handle_request(), passes message as a parameter, and returns the value produced by the closure. + SendMessage(message any, rw http.ResponseWriter) (any, error) + // NumThreads returns the number of available threads. + NumThreads() int } -var extensionWorkers = make(map[string]Worker) - -// EXPERIMENTAL: RegisterWorker registers an external worker. -// external workers are booted together with regular workers on server startup. -func RegisterWorker(worker Worker) error { - if _, exists := extensionWorkers[worker.name]; exists { - return errors.New("worker with this name is already registered: " + worker.name) - } - - extensionWorkers[worker.name] = worker - - return nil +type extensionWorkers struct { + name string + fileName string + num int + options []WorkerOption + internalWorker *worker } // EXPERIMENTAL: SendRequest sends an HTTP request to the worker and writes the response to the provided ResponseWriter. -func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error { - worker := getWorkerByName(w.name) - - if worker == nil { - return errors.New("worker not found: " + w.name) - } - +func (w *extensionWorkers) SendRequest(rw http.ResponseWriter, r *http.Request) error { fr, err := NewRequestWithContext( r, WithOriginalRequest(r), @@ -61,50 +35,22 @@ func (w Worker) SendRequest(rw http.ResponseWriter, r *http.Request) error { return err } - err = ServeHTTP(rw, fr) - - if err != nil { - return err - } - - return nil + return ServeHTTP(rw, fr) } -func (w Worker) NumThreads() int { - worker := getWorkerByName(w.name) - - if worker == nil { - return 0 - } - - return worker.countThreads() +func (w *extensionWorkers) NumThreads() int { + return w.internalWorker.countThreads() } // EXPERIMENTAL: SendMessage sends a message to the worker and waits for a response. -func (w Worker) SendMessage(message any, rw http.ResponseWriter) (any, error) { - internalWorker := getWorkerByName(w.name) - - if internalWorker == nil { - return nil, errors.New("worker not found: " + w.name) - } - +func (w *extensionWorkers) SendMessage(message any, rw http.ResponseWriter) (any, error) { fc := newFrankenPHPContext() fc.logger = logger - fc.worker = internalWorker + fc.worker = w.internalWorker fc.responseWriter = rw fc.handlerParameters = message - internalWorker.handleRequest(fc) + err := w.internalWorker.handleRequest(fc) - return fc.handlerReturn, nil -} - -// EXPERIMENTAL: NewWorker registers an external worker with the given options -func NewWorker(name string, fileName string, num int, options ...WorkerOption) Worker { - return Worker{ - name: name, - fileName: fileName, - num: num, - options: options, - } + return fc.handlerReturn, err } diff --git a/workerextension_test.go b/workerextension_test.go index cddbb854..1719cb03 100644 --- a/workerextension_test.go +++ b/workerextension_test.go @@ -9,14 +9,14 @@ import ( "github.com/stretchr/testify/require" ) -func TestWorkerExtension(t *testing.T) { +func TestWorkersExtension(t *testing.T) { readyWorkers := 0 shutdownWorkers := 0 serverStarts := 0 serverShutDowns := 0 - externalWorker := NewWorker( - "externalWorker", + externalWorkers, o := WithExtensionWorkers( + "extensionWorkers", "testdata/worker.php", 1, WithWorkerOnReady(func(id int) { @@ -33,20 +33,16 @@ func TestWorkerExtension(t *testing.T) { }), ) - assert.NoError(t, RegisterWorker(externalWorker)) - - require.NoError(t, Init()) - defer func() { - // Clean up external workers after test to avoid interfering with other tests - delete(extensionWorkers, externalWorker.name) + require.NoError(t, Init(o)) + t.Cleanup(func() { Shutdown() assert.Equal(t, 1, shutdownWorkers, "Worker shutdown hook should have been called") assert.Equal(t, 1, serverShutDowns, "Server shutdown hook should have been called") - }() + }) assert.Equal(t, readyWorkers, 1, "Worker thread should have called onReady()") assert.Equal(t, serverStarts, 1, "Server start hook should have been called") - assert.Equal(t, externalWorker.NumThreads(), 1, "NumThreads() should report 1 thread") + assert.Equal(t, externalWorkers.NumThreads(), 1, "NumThreads() should report 1 thread") // Create a test request req := httptest.NewRequest("GET", "https://example.com/test/?foo=bar", nil) @@ -54,7 +50,7 @@ func TestWorkerExtension(t *testing.T) { w := httptest.NewRecorder() // Inject the request into the worker through the extension - err := externalWorker.SendRequest(w, req) + err := externalWorkers.SendRequest(w, req) assert.NoError(t, err, "Sending request should not produce an error") resp := w.Result() @@ -67,38 +63,22 @@ func TestWorkerExtension(t *testing.T) { } func TestWorkerExtensionSendMessage(t *testing.T) { - externalWorker := NewWorker("externalWorker", "testdata/message-worker.php", 1) - assert.NoError(t, RegisterWorker(externalWorker)) + externalWorker, o := WithExtensionWorkers("extensionWorkers", "testdata/message-worker.php", 1) - // Clean up external workers after test to avoid interfering with other tests - defer func() { - delete(extensionWorkers, externalWorker.name) - }() - - err := Init() + err := Init(o) require.NoError(t, err) - defer Shutdown() + t.Cleanup(Shutdown) - result, err := externalWorker.SendMessage("Hello Worker", nil) - assert.NoError(t, err, "Sending message should not produce an error") + ret, err := externalWorker.SendMessage("Hello Workers", nil) + require.NoError(t, err) - switch v := result.(type) { - case string: - assert.Equal(t, "received message: Hello Worker", v) - default: - t.Fatalf("Expected result to be string, got %T", v) - } + assert.Equal(t, "received message: Hello Workers", ret) } func TestErrorIf2WorkersHaveSameName(t *testing.T) { - w := NewWorker("duplicateWorker", "testdata/worker.php", 1) - w2 := NewWorker("duplicateWorker", "testdata/worker2.php", 1) + _, o1 := WithExtensionWorkers("duplicateWorker", "testdata/worker.php", 1) + _, o2 := WithExtensionWorkers("duplicateWorker", "testdata/worker2.php", 1) - err := RegisterWorker(w) - require.NoError(t, err, "First registration should succeed") - - err = RegisterWorker(w2) - require.Error(t, err, "Second registration with duplicate name should fail") - // Clean up external workers after test to avoid interfering with other tests - extensionWorkers = make(map[string]Worker) + t.Cleanup(Shutdown) + require.Error(t, Init(o1, o2)) }