Compare commits

...

7 Commits

Author SHA1 Message Date
Robert Landers
8339f2d072 unwrap go routine 2024-06-02 17:18:56 +02:00
Robert Landers
e1df51097c prevent go from reusing threads 2024-06-02 17:18:08 +02:00
Robert Landers
cde930c5c5 remove C-Thread-Pool 2024-06-02 12:04:25 +02:00
Robert Landers
3a75120832 see: https://github.com/golang/go/issues/6883\#issuecomment-383800123 2024-06-02 12:02:00 +02:00
Robert Landers
9d843b9577 factor out thread pool 2024-06-02 12:01:49 +02:00
Robert Landers
a31abc4514 delete defunc code 2024-06-02 10:59:55 +02:00
Robert Landers
9f17ed6776 refactor fetch and execute into single function 2024-06-02 10:59:46 +02:00
11 changed files with 65 additions and 936 deletions

View File

@@ -32,7 +32,6 @@ jobs:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
LINTER_RULES_PATH: /
FILTER_REGEX_EXCLUDE: '.*C-Thread-Pool/.*'
MARKDOWN_CONFIG_FILE: .markdown-lint.yaml
VALIDATE_CPP: false
VALIDATE_JSCPD: false

View File

@@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2016 Johan Hanssen Seferidis
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,70 +0,0 @@
[![GitHub Actions](https://github.com/Pithikos/C-Thread-Pool/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Pithikos/C-Thread-Pool/actions?query=workflow%3Atests+branch%3Amaster)
# C Thread Pool
This is a minimal but advanced threadpool implementation.
* ANCI C and POSIX compliant
* Pause/resume/wait as you like
* Simple easy-to-digest API
* Well tested
The threadpool is under MIT license. Notice that this project took a considerable amount of work and sacrifice of my free time and the reason I give it for free (even for commercial use) is so when you become rich and wealthy you don't forget about us open-source creatures of the night. Cheers!
If this project reduced your development time feel free to buy me a coffee.
[![Donate](https://www.paypal.com/en_US/i/btn/x-click-but21.gif)](https://www.paypal.me/seferidis)
## Run an example
The library is not precompiled so you have to compile it with your project. The thread pool
uses POSIX threads so if you compile with gcc on Linux you have to use the flag `-pthread` like this:
gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example
Then run the executable like this:
./example
## Basic usage
1. Include the header in your source file: `#include "thpool.h"`
2. Create a thread pool with number of threads you want: `threadpool thpool = thpool_init(4);`
3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, (void*)arg_p);`
The workers(threads) will start their work automatically as fast as there is new work
in the pool. If you want to wait for all added work to be finished before continuing
you can use `thpool_wait(thpool);`. If you want to destroy the pool you can use
`thpool_destroy(thpool);`.
## API
For a deeper look into the documentation check in the [thpool.h](https://github.com/Pithikos/C-Thread-Pool/blob/master/thpool.h) file. Below is a fast practical overview.
| Function example | Description |
|---------------------------------|---------------------------------------------------------------------|
| ***thpool_init(4)*** | Will return a new threadpool with `4` threads. |
| ***thpool_add_work(thpool, (void*)function_p, (void*)arg_p)*** | Will add new work to the pool. Work is simply a function. You can pass a single argument to the function if you wish. If not, `NULL` should be passed. |
| ***thpool_wait(thpool)*** | Will wait for all jobs (both in queue and currently running) to finish. |
| ***thpool_destroy(thpool)*** | This will destroy the threadpool. If jobs are currently being executed, then it will wait for them to finish. |
| ***thpool_pause(thpool)*** | All threads in the threadpool will pause no matter if they are idle or executing work. |
| ***thpool_resume(thpool)*** | If the threadpool is paused, then all threads will resume from where they were. |
| ***thpool_num_threads_working(thpool)*** | Will return the number of currently working threads. |
## Contribution
You are very welcome to contribute. If you have a new feature in mind, you can always open an issue on github describing it so you don't end up doing a lot of work that might not be eventually merged. Generally we are very open to contributions as long as they follow the below keypoints.
* Try to keep the API as minimal as possible. That means if a feature or fix can be implemented without affecting the existing API but requires more development time, then we will opt to sacrifice development time.
* Solutions need to be POSIX compliant. The thread-pool is advertised as such so it makes sense that it actually is.
* For coding style simply try to stick to the conventions you find in the existing codebase.
* Tests: A new fix or feature should be covered by tests. If the existing tests are not sufficient, we expect an according test to follow with the pull request.
* Documentation: for a new feature please add documentation. For an API change the documentation has to be thorough and super easy to understand.
If you wish to **get access as a collaborator** feel free to mention it in the issue https://github.com/Pithikos/C-Thread-Pool/issues/78

View File

@@ -1,571 +0,0 @@
/* ********************************
* Author: Johan Hanssen Seferidis
* License: MIT
* Description: Library providing a threading pool where you can add
* work. For usage, check the thpool.h file or README.md
*
*//** @file thpool.h *//*
*
********************************/
#if defined(__APPLE__)
#include <AvailabilityMacros.h>
#else
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500
#endif
#endif
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#if defined(__linux__)
#include <sys/prctl.h>
#endif
#if defined(__FreeBSD__) || defined(__OpenBSD__)
#include <pthread_np.h>
#endif
#include "thpool.h"
#ifdef THPOOL_DEBUG
#define THPOOL_DEBUG 1
#else
#define THPOOL_DEBUG 0
#endif
#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
#define err(str) fprintf(stderr, str)
#else
#define err(str)
#endif
#ifndef THPOOL_THREAD_NAME
#define THPOOL_THREAD_NAME thpool
#endif
#define STRINGIFY(x) #x
#define TOSTRING(x) STRINGIFY(x)
static volatile int threads_keepalive;
static volatile int threads_on_hold;
/* ========================== STRUCTURES ============================ */
/* Binary semaphore */
typedef struct bsem {
pthread_mutex_t mutex;
pthread_cond_t cond;
int v;
} bsem;
/* Job */
typedef struct job{
struct job* prev; /* pointer to previous job */
void (*function)(void* arg); /* function pointer */
void* arg; /* function's argument */
} job;
/* Job queue */
typedef struct jobqueue{
pthread_mutex_t rwmutex; /* used for queue r/w access */
job *front; /* pointer to front of queue */
job *rear; /* pointer to rear of queue */
bsem *has_jobs; /* flag as binary semaphore */
int len; /* number of jobs in queue */
} jobqueue;
/* Thread */
typedef struct thread{
int id; /* friendly id */
pthread_t pthread; /* pointer to actual thread */
struct thpool_* thpool_p; /* access to thpool */
} thread;
/* Threadpool */
typedef struct thpool_{
thread** threads; /* pointer to threads */
volatile int num_threads_alive; /* threads currently alive */
volatile int num_threads_working; /* threads currently working */
pthread_mutex_t thcount_lock; /* used for thread count etc */
pthread_cond_t threads_all_idle; /* signal to thpool_wait */
jobqueue jobqueue; /* job queue */
} thpool_;
/* ========================== PROTOTYPES ============================ */
static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
static void* thread_do(struct thread* thread_p);
static void thread_hold(int sig_id);
static void thread_destroy(struct thread* thread_p);
static int jobqueue_init(jobqueue* jobqueue_p);
static void jobqueue_clear(jobqueue* jobqueue_p);
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
static struct job* jobqueue_pull(jobqueue* jobqueue_p);
static void jobqueue_destroy(jobqueue* jobqueue_p);
static void bsem_init(struct bsem *bsem_p, int value);
static void bsem_reset(struct bsem *bsem_p);
static void bsem_post(struct bsem *bsem_p);
static void bsem_post_all(struct bsem *bsem_p);
static void bsem_wait(struct bsem *bsem_p);
/* ========================== THREADPOOL ============================ */
/* Initialise thread pool */
struct thpool_* thpool_init(int num_threads){
threads_on_hold = 0;
threads_keepalive = 1;
if (num_threads < 0){
num_threads = 0;
}
/* Make new thread pool */
thpool_* thpool_p;
thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
if (thpool_p == NULL){
err("thpool_init(): Could not allocate memory for thread pool\n");
return NULL;
}
thpool_p->num_threads_alive = 0;
thpool_p->num_threads_working = 0;
/* Initialise the job queue */
if (jobqueue_init(&thpool_p->jobqueue) == -1){
err("thpool_init(): Could not allocate memory for job queue\n");
free(thpool_p);
return NULL;
}
/* Make threads in pool */
thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *));
if (thpool_p->threads == NULL){
err("thpool_init(): Could not allocate memory for threads\n");
jobqueue_destroy(&thpool_p->jobqueue);
free(thpool_p);
return NULL;
}
pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
pthread_cond_init(&thpool_p->threads_all_idle, NULL);
/* Thread init */
int n;
for (n=0; n<num_threads; n++){
thread_init(thpool_p, &thpool_p->threads[n], n);
#if THPOOL_DEBUG
printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
#endif
}
/* Wait for threads to initialize */
while (thpool_p->num_threads_alive != num_threads) {}
return thpool_p;
}
/* Add work to the thread pool */
int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
job* newjob;
newjob=(struct job*)malloc(sizeof(struct job));
if (newjob==NULL){
err("thpool_add_work(): Could not allocate memory for new job\n");
return -1;
}
/* add function and argument */
newjob->function=function_p;
newjob->arg=arg_p;
/* add job to queue */
jobqueue_push(&thpool_p->jobqueue, newjob);
return 0;
}
/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
pthread_mutex_lock(&thpool_p->thcount_lock);
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
}
/* Destroy the threadpool */
void thpool_destroy(thpool_* thpool_p){
/* No need to destroy if it's NULL */
if (thpool_p == NULL) return ;
volatile int threads_total = thpool_p->num_threads_alive;
/* End each thread 's infinite loop */
threads_keepalive = 0;
/* Give one second to kill idle threads */
double TIMEOUT = 1.0;
time_t start, end;
double tpassed = 0.0;
time (&start);
while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue.has_jobs);
time (&end);
tpassed = difftime(end,start);
}
/* Poll remaining threads */
while (thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue.has_jobs);
sleep(1);
}
/* Job queue cleanup */
jobqueue_destroy(&thpool_p->jobqueue);
/* Deallocs */
int n;
for (n=0; n < threads_total; n++){
thread_destroy(thpool_p->threads[n]);
}
free(thpool_p->threads);
free(thpool_p);
}
/* Pause all threads in threadpool */
void thpool_pause(thpool_* thpool_p) {
int n;
for (n=0; n < thpool_p->num_threads_alive; n++){
pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
}
}
/* Resume all threads in threadpool */
void thpool_resume(thpool_* thpool_p) {
// resuming a single threadpool hasn't been
// implemented yet, meanwhile this suppresses
// the warnings
(void)thpool_p;
threads_on_hold = 0;
}
int thpool_num_threads_working(thpool_* thpool_p){
return thpool_p->num_threads_working;
}
/* ============================ THREAD ============================== */
/* Initialize a thread in the thread pool
*
* @param thread address to the pointer of the thread to be created
* @param id id to be given to the thread
* @return 0 on success, -1 otherwise.
*/
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
*thread_p = (struct thread*)malloc(sizeof(struct thread));
if (*thread_p == NULL){
err("thread_init(): Could not allocate memory for thread\n");
return -1;
}
(*thread_p)->thpool_p = thpool_p;
(*thread_p)->id = id;
pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p));
pthread_detach((*thread_p)->pthread);
return 0;
}
/* Sets the calling thread on hold */
static void thread_hold(int sig_id) {
(void)sig_id;
threads_on_hold = 1;
while (threads_on_hold){
sleep(1);
}
}
/* What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interrupted is once
* thpool_destroy() is invoked or the program exits.
*
* @param thread thread that will run this function
* @return nothing
*/
static void* thread_do(struct thread* thread_p){
/* Set thread name for profiling and debugging */
char thread_name[16] = {0};
snprintf(thread_name, 16, TOSTRING(THPOOL_THREAD_NAME) "-%d", thread_p->id);
#if defined(__linux__)
/* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
prctl(PR_SET_NAME, thread_name);
#elif defined(__APPLE__) && defined(__MACH__)
pthread_setname_np(thread_name);
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(thread_p->pthread, thread_name);
#else
err("thread_do(): pthread_setname_np is not supported on this system");
#endif
/* Assure all threads have been created before starting serving */
thpool_* thpool_p = thread_p->thpool_p;
/* Register signal handler */
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_ONSTACK;
act.sa_handler = thread_hold;
if (sigaction(SIGUSR1, &act, NULL) == -1) {
err("thread_do(): cannot handle SIGUSR1");
}
/* Mark thread as alive (initialized) */
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_alive += 1;
pthread_mutex_unlock(&thpool_p->thcount_lock);
while(threads_keepalive){
bsem_wait(thpool_p->jobqueue.has_jobs);
if (threads_keepalive){
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working++;
pthread_mutex_unlock(&thpool_p->thcount_lock);
/* Read job from queue and execute it */
void (*func_buff)(void*);
void* arg_buff;
job* job_p = jobqueue_pull(&thpool_p->jobqueue);
if (job_p) {
func_buff = job_p->function;
arg_buff = job_p->arg;
func_buff(arg_buff);
free(job_p);
}
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working--;
if (!thpool_p->num_threads_working) {
pthread_cond_signal(&thpool_p->threads_all_idle);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
}
}
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_alive --;
pthread_mutex_unlock(&thpool_p->thcount_lock);
return NULL;
}
/* Frees a thread */
static void thread_destroy (thread* thread_p){
free(thread_p);
}
/* ============================ JOB QUEUE =========================== */
/* Initialize queue */
static int jobqueue_init(jobqueue* jobqueue_p){
jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (jobqueue_p->has_jobs == NULL){
return -1;
}
pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
bsem_init(jobqueue_p->has_jobs, 0);
return 0;
}
/* Clear the queue */
static void jobqueue_clear(jobqueue* jobqueue_p){
while(jobqueue_p->len){
free(jobqueue_pull(jobqueue_p));
}
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
bsem_reset(jobqueue_p->has_jobs);
jobqueue_p->len = 0;
}
/* Add (allocated) job to queue
*/
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
pthread_mutex_lock(&jobqueue_p->rwmutex);
newjob->prev = NULL;
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
jobqueue_p->front = newjob;
jobqueue_p->rear = newjob;
break;
default: /* if jobs in queue */
jobqueue_p->rear->prev = newjob;
jobqueue_p->rear = newjob;
}
jobqueue_p->len++;
bsem_post(jobqueue_p->has_jobs);
pthread_mutex_unlock(&jobqueue_p->rwmutex);
}
/* Get first job from queue(removes it from queue)
* Notice: Caller MUST hold a mutex
*/
static struct job* jobqueue_pull(jobqueue* jobqueue_p){
pthread_mutex_lock(&jobqueue_p->rwmutex);
job* job_p = jobqueue_p->front;
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
break;
case 1: /* if one job in queue */
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->len = 0;
break;
default: /* if >1 jobs in queue */
jobqueue_p->front = job_p->prev;
jobqueue_p->len--;
/* more than one job in queue -> post it */
bsem_post(jobqueue_p->has_jobs);
}
pthread_mutex_unlock(&jobqueue_p->rwmutex);
return job_p;
}
/* Free all queue resources back to the system */
static void jobqueue_destroy(jobqueue* jobqueue_p){
jobqueue_clear(jobqueue_p);
free(jobqueue_p->has_jobs);
}
/* ======================== SYNCHRONISATION ========================= */
/* Init semaphore to 1 or 0 */
static void bsem_init(bsem *bsem_p, int value) {
if (value < 0 || value > 1) {
err("bsem_init(): Binary semaphore can take only values 1 or 0");
exit(1);
}
pthread_mutex_init(&(bsem_p->mutex), NULL);
pthread_cond_init(&(bsem_p->cond), NULL);
bsem_p->v = value;
}
/* Reset semaphore to 0 */
static void bsem_reset(bsem *bsem_p) {
pthread_mutex_destroy(&(bsem_p->mutex));
pthread_cond_destroy(&(bsem_p->cond));
bsem_init(bsem_p, 0);
}
/* Post to at least one thread */
static void bsem_post(bsem *bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
bsem_p->v = 1;
pthread_cond_signal(&bsem_p->cond);
pthread_mutex_unlock(&bsem_p->mutex);
}
/* Post to all threads */
static void bsem_post_all(bsem *bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
bsem_p->v = 1;
pthread_cond_broadcast(&bsem_p->cond);
pthread_mutex_unlock(&bsem_p->mutex);
}
/* Wait on semaphore until semaphore has value 0 */
static void bsem_wait(bsem* bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
while (bsem_p->v != 1) {
pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
}
bsem_p->v = 0;
pthread_mutex_unlock(&bsem_p->mutex);
}

View File

@@ -1,187 +0,0 @@
/**********************************
* @author Johan Hanssen Seferidis
* License: MIT
*
**********************************/
#ifndef _THPOOL_
#define _THPOOL_
#ifdef __cplusplus
extern "C" {
#endif
/* =================================== API ======================================= */
typedef struct thpool_* threadpool;
/**
* @brief Initialize threadpool
*
* Initializes a threadpool. This function will not return until all
* threads have initialized successfully.
*
* @example
*
* ..
* threadpool thpool; //First we declare a threadpool
* thpool = thpool_init(4); //then we initialize it to 4 threads
* ..
*
* @param num_threads number of threads to be created in the threadpool
* @return threadpool created threadpool on success,
* NULL on error
*/
threadpool thpool_init(int num_threads);
/**
* @brief Add work to the job queue
*
* Takes an action and its argument and adds it to the threadpool's job queue.
* If you want to add to work a function with more than one arguments then
* a way to implement this is by passing a pointer to a structure.
*
* NOTICE: You have to cast both the function and argument to not get warnings.
*
* @example
*
* void print_num(int num){
* printf("%d\n", num);
* }
*
* int main() {
* ..
* int a = 10;
* thpool_add_work(thpool, (void*)print_num, (void*)a);
* ..
* }
*
* @param threadpool threadpool to which the work will be added
* @param function_p pointer to function to add as work
* @param arg_p pointer to an argument
* @return 0 on success, -1 otherwise.
*/
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
/**
* @brief Wait for all queued jobs to finish
*
* Will wait for all jobs - both queued and currently running to finish.
* Once the queue is empty and all work has completed, the calling thread
* (probably the main program) will continue.
*
* Smart polling is used in wait. The polling is initially 0 - meaning that
* there is virtually no polling at all. If after 1 seconds the threads
* haven't finished, the polling interval starts growing exponentially
* until it reaches max_secs seconds. Then it jumps down to a maximum polling
* interval assuming that heavy processing is being used in the threadpool.
*
* @example
*
* ..
* threadpool thpool = thpool_init(4);
* ..
* // Add a bunch of work
* ..
* thpool_wait(thpool);
* puts("All added work has finished");
* ..
*
* @param threadpool the threadpool to wait for
* @return nothing
*/
void thpool_wait(threadpool);
/**
* @brief Pauses all threads immediately
*
* The threads will be paused no matter if they are idle or working.
* The threads return to their previous states once thpool_resume
* is called.
*
* While the thread is being paused, new work can be added.
*
* @example
*
* threadpool thpool = thpool_init(4);
* thpool_pause(thpool);
* ..
* // Add a bunch of work
* ..
* thpool_resume(thpool); // Let the threads start their magic
*
* @param threadpool the threadpool where the threads should be paused
* @return nothing
*/
void thpool_pause(threadpool);
/**
* @brief Unpauses all threads if they are paused
*
* @example
* ..
* thpool_pause(thpool);
* sleep(10); // Delay execution 10 seconds
* thpool_resume(thpool);
* ..
*
* @param threadpool the threadpool where the threads should be unpaused
* @return nothing
*/
void thpool_resume(threadpool);
/**
* @brief Destroy the threadpool
*
* This will wait for the currently active threads to finish and then 'kill'
* the whole threadpool to free up memory.
*
* @example
* int main() {
* threadpool thpool1 = thpool_init(2);
* threadpool thpool2 = thpool_init(2);
* ..
* thpool_destroy(thpool1);
* ..
* return 0;
* }
*
* @param threadpool the threadpool to destroy
* @return nothing
*/
void thpool_destroy(threadpool);
/**
* @brief Show currently working threads
*
* Working threads are the threads that are performing work (not idle).
*
* @example
* int main() {
* threadpool thpool1 = thpool_init(2);
* threadpool thpool2 = thpool_init(2);
* ..
* printf("Working threads: %d\n", thpool_num_threads_working(thpool1));
* ..
* return 0;
* }
*
* @param threadpool the threadpool of interest
* @return integer number of threads working
*/
int thpool_num_threads_working(threadpool);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -80,7 +80,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get
WORKDIR /go/src/app
COPY --link *.* ./
COPY --link caddy caddy
COPY --link C-Thread-Pool C-Thread-Pool
COPY --link internal internal
COPY --link testdata testdata

View File

@@ -78,7 +78,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get
WORKDIR /go/src/app
COPY --link *.* ./
COPY --link caddy caddy
COPY --link C-Thread-Pool C-Thread-Pool
COPY --link internal internal
COPY --link testdata testdata

View File

@@ -18,9 +18,6 @@
#include <stdlib.h>
#include <unistd.h>
#include "C-Thread-Pool/thpool.c"
#include "C-Thread-Pool/thpool.h"
#include "_cgo_export.h"
#include "frankenphp_arginfo.h"
@@ -726,7 +723,7 @@ sapi_module_struct frankenphp_sapi_module = {
STANDARD_SAPI_MODULE_PROPERTIES};
static void *manager_thread(void *arg) {
void frankenphp_prepare_thread() {
// SIGPIPE must be masked in non-Go threads:
// https://pkg.go.dev/os/signal#hdr-Go_programs_that_use_cgo_or_SWIG
sigset_t set;
@@ -737,10 +734,9 @@ static void *manager_thread(void *arg) {
perror("failed to block SIGPIPE");
exit(EXIT_FAILURE);
}
}
int num_threads = *((int *)arg);
free(arg);
arg = NULL;
void frankenphp_prepare_sapi(int num_threads) {
#ifdef ZTS
#if (PHP_VERSION_ID >= 80300)
@@ -767,18 +763,9 @@ static void *manager_thread(void *arg) {
#endif
frankenphp_sapi_module.startup(&frankenphp_sapi_module);
}
threadpool thpool = thpool_init(num_threads);
uintptr_t rh;
while ((rh = go_fetch_request())) {
thpool_add_work(thpool, go_execute_script, (void *)rh);
}
/* channel closed, shutdown gracefully */
thpool_wait(thpool);
thpool_destroy(thpool);
void frankenphp_shutdown_sapi() {
frankenphp_sapi_module.shutdown(&frankenphp_sapi_module);
sapi_shutdown();
@@ -792,26 +779,6 @@ static void *manager_thread(void *arg) {
frankenphp_sapi_module.ini_entries = NULL;
}
#endif
go_shutdown();
return NULL;
}
int frankenphp_init(int num_threads) {
pthread_t thread;
int *num_threads_ptr = calloc(1, sizeof(int));
*num_threads_ptr = num_threads;
if (pthread_create(&thread, NULL, *manager_thread, (void *)num_threads_ptr) !=
0) {
go_shutdown();
return -1;
}
return pthread_detach(thread);
}
int frankenphp_request_startup() {

View File

@@ -5,10 +5,6 @@
// [FrankenPHP app server]: https://frankenphp.dev
package frankenphp
//go:generate rm -Rf C-Thread-Pool/
//go:generate git clone --depth=1 git@github.com:Pithikos/C-Thread-Pool.git
//go:generate rm -Rf C-Thread-Pool/.git C-Thread-Pool/.github C-Thread-Pool/docs C-Thread-Pool/tests C-Thread-Pool/example.c
// Use PHP includes corresponding to your PHP installation by running:
//
// export CGO_CFLAGS=$(php-config --includes)
@@ -17,7 +13,7 @@ package frankenphp
// We also set these flags for hardening: https://github.com/docker-library/php/blob/master/8.2/bookworm/zts/Dockerfile#L57-L59
// #cgo darwin pkg-config: libxml-2.0
// #cgo CFLAGS: -Wall -Werror -fstack-protector-strong -fpic -fpie -O2 -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64
// #cgo CFLAGS: -Werror -fstack-protector-strong -fpic -fpie -O2 -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64
// #cgo CFLAGS: -I/usr/local/include/php -I/usr/local/include/php/main -I/usr/local/include/php/TSRM -I/usr/local/include/php/Zend -I/usr/local/include/php/ext -I/usr/local/include/php/ext/date/lib
// #cgo CFLAGS: -DTHREAD_NAME=frankenphp
// #cgo linux CFLAGS: -D_GNU_SOURCE
@@ -317,9 +313,7 @@ func Init(options ...Option) error {
done = make(chan struct{})
requestChan = make(chan *http.Request)
if C.frankenphp_init(C.int(opt.numThreads)) != 0 {
return MainThreadCreationError
}
go startMainThreads(opt.numThreads)
if err := initWorkers(opt.workers); err != nil {
return err
@@ -333,6 +327,34 @@ func Init(options ...Option) error {
return nil
}
func startMainThreads(numThreads int) bool {
// prevent sharing this thread with other go funcs
runtime.LockOSThread()
// note: we do NOT unlock the thread so that Go will not try to reuse it when this func completes
var threads sync.WaitGroup
C.frankenphp_prepare_thread()
C.frankenphp_prepare_sapi(C.int(numThreads))
for i := 0; i < numThreads; i++ {
threads.Add(1)
go func() {
runtime.LockOSThread()
defer threads.Done()
C.frankenphp_prepare_thread()
go_fetch_and_execute()
}()
}
threads.Wait()
C.frankenphp_shutdown_sapi()
go_shutdown()
return true
}
// Shutdown stops the workers and the PHP runtime.
func Shutdown() {
stopWorkers()
@@ -468,16 +490,34 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
return nil
}
//export go_fetch_request
func go_fetch_request() C.uintptr_t {
select {
case <-done:
return 0
//export go_fetch_and_execute
func go_fetch_and_execute() {
for {
select {
case <-done:
return
case r := <-requestChan:
h := cgo.NewHandle(r)
r.Context().Value(handleKey).(*handleList).AddHandle(h)
case r := <-requestChan:
h := cgo.NewHandle(r)
r.Context().Value(handleKey).(*handleList).AddHandle(h)
return C.uintptr_t(h)
fc, ok := FromContext(r.Context())
if !ok {
panic(InvalidRequestError)
}
if err := updateServerContext(r, true, 0); err != nil {
panic(err)
}
// scriptFilename is freed in frankenphp_execute_script()
fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename))
if fc.exitStatus < 0 {
panic(ScriptExecutionError)
}
maybeCloseContext(fc)
r.Context().Value(handleKey).(*handleList).FreeAll()
}
}
}
@@ -487,33 +527,6 @@ func maybeCloseContext(fc *FrankenPHPContext) {
})
}
// go_execute_script Note: only called in cgi-mode
//
//export go_execute_script
func go_execute_script(rh unsafe.Pointer) {
handle := cgo.Handle(rh)
request := handle.Value().(*http.Request)
fc, ok := FromContext(request.Context())
if !ok {
panic(InvalidRequestError)
}
defer func() {
maybeCloseContext(fc)
request.Context().Value(handleKey).(*handleList).FreeAll()
}()
if err := updateServerContext(request, true, 0); err != nil {
panic(err)
}
// scriptFilename is freed in frankenphp_execute_script()
fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename))
if fc.exitStatus < 0 {
panic(ScriptExecutionError)
}
}
//export go_ub_write
func go_ub_write(rh C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) {
r := cgo.Handle(rh).Value().(*http.Request)

View File

@@ -40,7 +40,9 @@ typedef struct frankenphp_config {
} frankenphp_config;
frankenphp_config frankenphp_get_config();
int frankenphp_init(int num_threads);
void frankenphp_prepare_thread();
void frankenphp_prepare_sapi(int num_threads);
void frankenphp_shutdown_sapi();
int frankenphp_update_server_context(
bool create, uintptr_t current_request, uintptr_t main_request,

View File

@@ -83,7 +83,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get
WORKDIR /go/src/app
COPY *.* ./
COPY caddy caddy
COPY C-Thread-Pool C-Thread-Pool
RUN --mount=type=secret,id=github-token GITHUB_TOKEN=$(cat /run/secrets/github-token) ./build-static.sh && \
rm -Rf dist/static-php-cli/source/*