diff --git a/osal/inc/mpp_thread.h b/osal/inc/mpp_thread.h index fa0cb5fa..19688cb3 100644 --- a/osal/inc/mpp_thread.h +++ b/osal/inc/mpp_thread.h @@ -58,6 +58,7 @@ typedef void *(*MppThreadFunc)(void *); typedef enum { MPP_THREAD_UNINITED, + MPP_THREAD_READY, MPP_THREAD_RUNNING, MPP_THREAD_WAITING, MPP_THREAD_STOPPING, @@ -294,4 +295,76 @@ private: #endif +/* + * status transaction: + * new + * v + * MPP_THREAD_UNINITED + * v + * setup + * v + * delete <- MPP_THREAD_READY <-------------------+ + * v | + * start | + * v | + * MPP_THREAD_RUNNING -> stop -> MPP_THREAD_STOPPING + * v | + * wait | + * v | + * MPP_THREAD_WAITING -> stop ------------+ + * + */ +typedef enum MppSThdStatus_e { + MPP_STHD_UNINITED, + MPP_STHD_READY, + MPP_STHD_RUNNING, + MPP_STHD_WAITING, + MPP_STHD_STOPPING, + MPP_STHD_BUTT, +} MppSThdStatus; + +/* MppSThd for Mpp Simple Thread */ +typedef void* MppSThd; +typedef void* MppSThdGrp; + +typedef struct MppSThdCtx_t { + MppSThd thd; + void *ctx; +} MppSThdCtx; + +typedef void *(*MppSThdFunc)(MppSThdCtx *); + +MppSThd mpp_sthd_get(const char *name); +void mpp_sthd_put(MppSThd thd); + +MppSThdStatus mpp_sthd_get_status(MppSThd thd); +const char* mpp_sthd_get_name(MppSThd thd); +RK_S32 mpp_sthd_get_idx(MppSThd thd); +RK_S32 mpp_sthd_check(MppSThd thd); + +void mpp_sthd_setup(MppSThd thd, MppSThdFunc func, void *ctx); + +void mpp_sthd_start(MppSThd thd); +void mpp_sthd_stop(MppSThd thd); +void mpp_sthd_stop_sync(MppSThd thd); + +void mpp_sthd_lock(MppSThd thd); +void mpp_sthd_unlock(MppSThd thd); +int mpp_sthd_trylock(MppSThd thd); + +void mpp_sthd_wait(MppSThd thd); +void mpp_sthd_signal(MppSThd thd); +void mpp_sthd_broadcast(MppSThd thd); + +/* multi-thread group with same callback and context */ +MppSThdGrp mpp_sthd_grp_get(const char *name, RK_S32 count); +void mpp_sthd_grp_put(MppSThdGrp grp); + +void mpp_sthd_grp_setup(MppSThdGrp grp, MppSThdFunc func, void *ctx); +MppSThd mpp_sthd_grp_get_each(MppSThdGrp grp, RK_S32 idx); + +void mpp_sthd_grp_start(MppSThdGrp grp); +void mpp_sthd_grp_stop(MppSThdGrp grp); +void mpp_sthd_grp_stop_sync(MppSThdGrp grp); + #endif /*__MPP_THREAD_H__*/ diff --git a/osal/mpp_thread.cpp b/osal/mpp_thread.cpp index 9183a1dc..a5fc5074 100644 --- a/osal/mpp_thread.cpp +++ b/osal/mpp_thread.cpp @@ -19,6 +19,8 @@ #include #include "mpp_log.h" +#include "mpp_mem.h" +#include "mpp_lock.h" #include "mpp_common.h" #include "mpp_thread.h" @@ -150,3 +152,525 @@ int pthread_setname_np(pthread_t thread, const char *name) #endif +typedef struct MppSThdImpl_t { + char *name; + MppSThdFunc func; + MppSThdStatus status; + RK_S32 idx; + pthread_t thd; + pthread_mutex_t lock; + pthread_cond_t cond; + MppSThdCtx ctx; +} MppSThdImpl; + +typedef struct MppSThdGrpImpl_t { + char name[THREAD_NAME_LEN]; + RK_S32 count; + MppSThdStatus status; + pthread_mutex_t lock; + MppSThdImpl thds[]; +} MppSThdGrpImpl; + +static const char *state2str(MppSThdStatus state) +{ + static const char *strof_sthd_status[] = { + "uninited", + "ready", + "running", + "waiting", + "stopping", + "invalid" + }; + + return state < MPP_STHD_BUTT ? strof_sthd_status[state] : strof_sthd_status[MPP_STHD_BUTT]; +} + +static RK_S32 check_sthd(const char *name, MppSThdImpl *thd) +{ + if (!thd) { + mpp_err("MppSThd NULL found at %s\n", name); + return MPP_NOK; + } + + if (thd->ctx.thd != thd) { + mpp_err("MppSThd check %p:%p mismatch at %s\n", thd->ctx.thd, thd, name); + return MPP_NOK; + } + + return MPP_OK; +} + +#define CHECK_STHD(thd) check_sthd(__FUNCTION__, (MppSThdImpl *)(thd)) + +static void mpp_sthd_init(MppSThdImpl *thd, RK_S32 idx) +{ + pthread_mutexattr_t attr; + + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&thd->lock, &attr); + pthread_mutexattr_destroy(&attr); + + pthread_cond_init(&thd->cond, NULL); + thd->ctx.thd = thd; + thd->idx = idx; +} + +static void mpp_sthd_deinit(MppSThdImpl *thd) +{ + mpp_assert(thd->ctx.thd == thd); + mpp_assert(thd->status < MPP_STHD_RUNNING); + + pthread_mutex_lock(&thd->lock); + thd->status = MPP_STHD_UNINITED; + thd->ctx.thd = NULL; + pthread_mutex_unlock(&thd->lock); + + pthread_cond_destroy(&thd->cond); + pthread_mutex_destroy(&thd->lock); +} + +static MPP_RET mpp_sthd_create(MppSThdImpl *thd) +{ + pthread_attr_t attr; + MPP_RET ret = MPP_NOK; + + mpp_assert(thd->ctx.thd == thd); + mpp_assert(thd->status < MPP_STHD_RUNNING); + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + // NOTE: set status to running first + thd->status = MPP_STHD_RUNNING; + if (0 == pthread_create(&thd->thd, &attr, (MppThreadFunc)thd->func, &thd->ctx)) { + ret = (MPP_RET)pthread_setname_np(thd->thd, thd->name); + if (ret) + mpp_err("%s %p setname failed\n", thd->thd, thd->func); + + thread_dbg(MPP_THREAD_DBG_FUNCTION, "thread %s %p context %p create success\n", + thd->name, thd->func, thd->ctx.ctx); + ret = MPP_OK; + } else { + thd->status = MPP_STHD_READY; + } + + pthread_attr_destroy(&attr); + + return ret; +} + +MppSThd mpp_sthd_get(const char *name) +{ + RK_S32 size = MPP_ALIGN(sizeof(MppSThdImpl), 8) + THREAD_NAME_LEN; + MppSThdImpl *thd = mpp_calloc_size(MppSThdImpl, size); + + if (!thd) { + mpp_err_f("failed to create simple thread\n"); + return NULL; + } + + thd->name = (char *)(thd + 1); + if (!name) + name = "mpp_sthd"; + + snprintf(thd->name, THREAD_NAME_LEN - 1, "%s", name); + + mpp_sthd_init(thd, -1); + + return thd; +} + +void mpp_sthd_put(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + mpp_assert(impl); + mpp_assert(impl->ctx.thd == impl); + mpp_assert(impl->status == MPP_STHD_UNINITED || impl->status == MPP_STHD_READY); + + mpp_sthd_deinit(impl); + + mpp_free(impl); +} + +MppSThdStatus mpp_sthd_get_status(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + return impl->status; +} + +const char* mpp_sthd_get_name(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + return impl->name; +} + +RK_S32 mpp_sthd_get_idx(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + return impl->idx; +} + +RK_S32 mpp_sthd_check(MppSThd thd) +{ + return CHECK_STHD(thd); +} + +void mpp_sthd_setup(MppSThd thd, MppSThdFunc func, void *ctx) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + MppSThdStatus status; + + CHECK_STHD(impl); + + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_UNINITED : + case MPP_STHD_READY : { + impl->func = func; + impl->ctx.ctx = ctx; + impl->status = func ? MPP_STHD_READY : MPP_STHD_UNINITED; + } break; + default : { + mpp_err("%s can NOT setup on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); + + CHECK_STHD(impl); +} + +void mpp_sthd_start(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + MppSThdStatus status; + + CHECK_STHD(impl); + + /* we can only change callback function on uninit */ + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_READY : { + mpp_sthd_create(impl); + } break; + default : { + mpp_err("%s can NOT start on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); + + CHECK_STHD(impl); +} + +void mpp_sthd_stop(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + MppSThdStatus status; + + CHECK_STHD(impl); + + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_RUNNING : + case MPP_STHD_WAITING : { + status = MPP_STHD_STOPPING; + pthread_cond_signal(&impl->cond); + } break; + default : { + mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); + + CHECK_STHD(impl); +} + +void mpp_sthd_stop_sync(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + MppSThdStatus status; + + CHECK_STHD(impl); + + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_STOPPING : { + void *dummy; + + pthread_join(impl->thd, &dummy); + impl->status = MPP_STHD_READY; + } break; + default : { + mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); + + CHECK_STHD(impl); +} + +void mpp_sthd_lock(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + pthread_mutex_lock(&impl->lock); +} + +void mpp_sthd_unlock(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + pthread_mutex_unlock(&impl->lock); +} + +int mpp_sthd_trylock(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + return pthread_mutex_trylock(&impl->lock); +} + +void mpp_sthd_wait(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + if (impl->status == MPP_STHD_RUNNING) + impl->status = MPP_STHD_WAITING; + + pthread_cond_wait(&impl->cond, &impl->lock); + + if (impl->status == MPP_STHD_WAITING) + impl->status = MPP_STHD_RUNNING; +} + +void mpp_sthd_signal(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + pthread_cond_signal(&impl->cond); +} + +void mpp_sthd_broadcast(MppSThd thd) +{ + MppSThdImpl *impl = (MppSThdImpl *)thd; + + CHECK_STHD(impl); + + pthread_cond_broadcast(&impl->cond); +} + +MppSThdGrp mpp_sthd_grp_get(const char *name, RK_S32 count) +{ + MppSThdGrpImpl *grp = NULL; + + if (count > 0) { + RK_S32 elem_size = MPP_ALIGN(sizeof(MppSThdImpl), 8); + RK_S32 total_size = MPP_ALIGN(sizeof(MppSThdGrpImpl), 8) + count * elem_size; + + grp = mpp_calloc_size(MppSThdGrpImpl, total_size); + if (grp) { + pthread_mutexattr_t attr; + RK_S32 i; + + if (!name) + name = "mpp_sthd_grp"; + + snprintf(grp->name, THREAD_NAME_LEN - 1, "%s", name); + + grp->count = count; + for (i = 0; i < count; i++) { + MppSThdImpl *thd = &grp->thds[i]; + + thd->name = grp->name; + mpp_sthd_init(thd, i); + } + + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&grp->lock, &attr); + pthread_mutexattr_destroy(&attr); + } + } + + if (!grp) + mpp_err_f("can NOT create %d threads group\n", count); + + return grp; +} + +void mpp_sthd_grp_put(MppSThdGrp grp) +{ + MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp; + RK_S32 i; + + mpp_assert(impl); + mpp_assert(impl->status == MPP_STHD_UNINITED || impl->status == MPP_STHD_READY); + + for (i = 0; i < impl->count; i++) { + MppSThdImpl *thd = &impl->thds[i]; + + mpp_sthd_deinit(thd); + } + + mpp_free(impl); +} + +void mpp_sthd_grp_setup(MppSThdGrp grp, MppSThdFunc func, void *ctx) +{ + MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp; + MppSThdStatus status; + + mpp_assert(impl); + + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_UNINITED : + case MPP_STHD_READY : { + MppSThdStatus next = func ? MPP_STHD_READY : MPP_STHD_UNINITED; + RK_S32 i; + + for (i = 0; i < impl->count; i++) { + MppSThdImpl *thd = &impl->thds[i]; + + thd->func = func; + thd->ctx.ctx = ctx; + thd->status = next; + } + impl->status = next; + } break; + default : { + mpp_err("%s can NOT setup on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); +} + +void mpp_sthd_grp_start(MppSThdGrp grp) +{ + MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp; + MppSThdStatus status; + + mpp_assert(impl); + + /* we can only change callback function on uninit */ + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_READY : { + RK_S32 i; + + for (i = 0; i < impl->count; i++) + mpp_sthd_start(&impl->thds[i]); + + impl->status = MPP_STHD_RUNNING; + } break; + default : { + mpp_err("%s can NOT start on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); +} + +void mpp_sthd_grp_stop(MppSThdGrp grp) +{ + MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp; + MppSThdStatus status; + + mpp_assert(impl); + + /* we can only change callback function on uninit */ + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_RUNNING : + case MPP_STHD_WAITING : { + RK_S32 i; + + impl->status = MPP_STHD_STOPPING; + + for (i = 0; i < impl->count; i++) { + MppSThdImpl *thd = &impl->thds[i]; + + pthread_mutex_lock(&thd->lock); + thd->status = MPP_STHD_STOPPING; + pthread_cond_signal(&thd->cond); + pthread_mutex_unlock(&thd->lock); + } + } break; + default : { + mpp_err("%s can NOT stop on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); +} + +void mpp_sthd_grp_stop_sync(MppSThdGrp grp) +{ + MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp; + MppSThdStatus status; + + mpp_assert(impl); + + /* we can only change callback function on uninit */ + pthread_mutex_lock(&impl->lock); + status = impl->status; + switch (status) { + case MPP_STHD_STOPPING : { + void *dummy; + RK_S32 i; + + status = MPP_STHD_STOPPING; + for (i = 0; i < impl->count; i++) { + MppSThdImpl *thd = &impl->thds[i]; + + pthread_join(thd->thd, &dummy); + thd->status = MPP_STHD_READY; + } + impl->status = MPP_STHD_READY; + } break; + default : { + mpp_err("%s can NOT stop sync on %s\n", impl->name, state2str(status)); + } break; + } + pthread_mutex_unlock(&impl->lock); +} + +MppSThd mpp_sthd_grp_get_each(MppSThdGrp grp, RK_S32 idx) +{ + MppSThdGrpImpl *impl = (MppSThdGrpImpl *)grp; + MppSThd ret = NULL; + + mpp_assert(impl); + mpp_assert(idx >= 0 && idx < impl->count); + + pthread_mutex_lock(&impl->lock); + ret = &impl->thds[idx]; + pthread_mutex_unlock(&impl->lock); + + return ret; +} \ No newline at end of file