[mpp_list]: change mpp_list lock mechanism

1. add mutex function to mpp_list for auto lock
2. remove all lock in list operation, so lock need to be add externally
[hal_task]: use new mpp_list to implement task group
[mpp_dec/mpp_enc]: change mpp_list usage


git-svn-id: https://10.10.10.66:8443/svn/MediaProcessPlatform/trunk/mpp@231 6e48237b-75ef-9749-8fc9-41990f28c85a
This commit is contained in:
ChenHengming
2015-09-07 16:06:33 +00:00
parent 241f6865ad
commit 7facfbaa5c
8 changed files with 98 additions and 144 deletions

View File

@@ -53,8 +53,8 @@ void *mpp_dec_parser_thread(void *data)
MppThread *parser = mpp->mThreadCodec;
MppDec *dec = mpp->mDec;
MppBufSlots slots = dec->slots;
HalTaskGroup tasks = dec->tasks;
MppPacketImpl packet;
HalTaskHnd task_hnd = NULL;
/*
* parser thread need to wait at cases below:
@@ -87,11 +87,7 @@ void *mpp_dec_parser_thread(void *data)
/*
* 1. get task handle from hal for parsing one frame
*/
if (NULL == task_hnd) {
hal_task_get_hnd(dec->tasks, 0, &task_hnd);
}
wait_on_task = (NULL == task_hnd);
wait_on_task = (MPP_OK != hal_task_can_put(tasks));
if (wait_on_task)
continue;
@@ -99,8 +95,8 @@ void *mpp_dec_parser_thread(void *data)
* 2. get packet to parse
*/
if (!packet_ready) {
Mutex::Autolock autoLock(&mpp->mPacketLock);
mpp_list *packets = mpp->mPackets;
Mutex::Autolock autoLock(packets->mutex());
if (packets->list_size()) {
/*
* packet will be destroyed outside, here just copy the content
@@ -181,9 +177,7 @@ void *mpp_dec_parser_thread(void *data)
* 6. send dxva output information and buffer information to hal thread
* combinate video codec dxva output and buffer information
*/
mpp_assert(task_hnd);
hal_task_set_info(task_hnd, &task_local);
hal_task_set_used(task_hnd, 1);
hal_task_put(tasks, &task_local);
mpp->mTaskPutCount++;
task_ready = 0;
@@ -199,8 +193,8 @@ void *mpp_dec_hal_thread(void *data)
MppThread *hal = mpp->mThreadHal;
MppDec *dec = mpp->mDec;
MppBufSlots slots = dec->slots;
HalTaskGroup tasks = dec->tasks;
mpp_list *frames = mpp->mFrames;
HalTaskHnd task_hnd = NULL;
/*
* hal thread need to wait at cases below:
@@ -222,19 +216,17 @@ void *mpp_dec_hal_thread(void *data)
hal->unlock();
// get hw task first
if (NULL == task_hnd)
hal_task_get_hnd(dec->tasks, 1, &task_hnd);
wait_on_task = (NULL == task_hnd);
wait_on_task = (MPP_OK != hal_task_can_get(tasks));
if (wait_on_task)
continue;
mpp->mTaskGetCount++;
hal_task_get_info(task_hnd, &task_local);
hal_task_get(tasks, &task_local);
// register genertation
mpp_hal_reg_gen(dec->hal_ctx, &task_local);
mpp->mThreadCodec->signal();
/*
* wait previous register set done
@@ -274,7 +266,9 @@ void *mpp_dec_hal_thread(void *data)
mpp_buf_slot_clr_display(slots, output);
frames->lock();
frames->add_at_tail(&frame, sizeof(frame));
frames->unlock();
mpp->mFramePutCount++;
/*
@@ -284,9 +278,6 @@ void *mpp_dec_hal_thread(void *data)
// signal()
// mark frame in output queue
// wait up output thread to get a output frame
hal_task_set_used(task_hnd, 0);
task_hnd = NULL;
mpp->mThreadCodec->signal();
}
return NULL;

View File

@@ -39,11 +39,14 @@ void *mpp_enc_control_thread(void *data)
char *buf = mpp_malloc(char, size);
while (MPP_THREAD_RUNNING == thd_enc->get_status()) {
Mutex::Autolock auto_lock(frames->mutex());
if (frames->list_size()) {
frames->del_at_head(&frame, sizeof(frame));
mpp_packet_init(&packet, buf, size);
packets->lock();
packets->add_at_tail(&packet, sizeof(packet));
packets->unlock();
}
}
mpp_free(buf);
@@ -70,8 +73,10 @@ void *mpp_enc_hal_thread(void *data)
// register genertation
if (tasks->list_size()) {
HalDecTask *task;
mpp->mTasks->del_at_head(&task, sizeof(task));
tasks->lock();
tasks->del_at_head(&task, sizeof(task));
mpp->mTaskGetCount++;
tasks->unlock();
// hal->mpp_hal_reg_gen(current);
@@ -100,8 +105,10 @@ void *mpp_enc_hal_thread(void *data)
MppFrame frame;
mpp_frame_init(&frame);
mpp_frame_set_buffer(frame, buffer);
frames->lock();
frames->add_at_tail(&frame, sizeof(frame));
mpp->mFramePutCount++;
frames->unlock();
}
}

View File

@@ -27,21 +27,16 @@ typedef struct HalTaskImpl_t HalTaskImpl;
typedef struct HalTaskGroupImpl_t HalTaskGroupImpl;
struct HalTaskImpl_t {
struct list_head list;
HalTaskGroupImpl *group;
RK_U32 used;
RK_U32 index;
HalTask task;
};
struct HalTaskGroupImpl_t {
struct list_head list_unused;
struct list_head list_used;
RK_U32 count_unused;
RK_U32 count_used;
RK_U32 count_put;
RK_U32 count_get;
MppCtxType type;
Mutex *lock;
HalTaskImpl *node;
RK_U32 count;
mpp_list *tasks;
};
static size_t get_task_size(HalTaskGroupImpl *group)
@@ -56,29 +51,21 @@ MPP_RET hal_task_group_init(HalTaskGroup *group, MppCtxType type, RK_U32 count)
return MPP_ERR_UNKNOW;
}
HalTaskGroupImpl *p = mpp_malloc_size(HalTaskGroupImpl,
sizeof(HalTaskGroupImpl) +
count * sizeof(HalTaskImpl));
*group = NULL;
HalTaskGroupImpl *p = mpp_malloc(HalTaskGroupImpl, 1);
if (NULL == p) {
*group = NULL;
mpp_err_f("malloc group failed\n");
return MPP_NOK;
}
memset(p, 0, sizeof(*p) + count * sizeof(HalTaskImpl));
INIT_LIST_HEAD(&p->list_unused);
INIT_LIST_HEAD(&p->list_used);
p->lock = new Mutex();
p->node = (HalTaskImpl*)(p + 1);
p->type = type;
Mutex::Autolock auto_lock(p->lock);
RK_U32 i;
for (i = 0; i < count; i++) {
p->node[i].group = p;
p->node[i].used = 0;
p->node[i].index = i;
list_add_tail(&p->node[i].list, &p->list_unused);
p->tasks = new mpp_list(NULL);
if (NULL == p->tasks) {
mpp_err_f("malloc task list failed\n");
mpp_free(p);
return MPP_NOK;
}
p->count_unused = count;
p->type = type;
p->count = count - 1;
p->count_put = p->count_get = 0;
*group = p;
return MPP_OK;
}
@@ -91,71 +78,61 @@ MPP_RET hal_task_group_deinit(HalTaskGroup group)
}
HalTaskGroupImpl *p = (HalTaskGroupImpl *)group;
if (p->lock) {
delete p->lock;
p->lock = NULL;
}
if (p->tasks)
delete p->tasks;
mpp_free(p);
return MPP_OK;
}
MPP_RET hal_task_get_hnd(HalTaskGroup group, RK_U32 used, HalTaskHnd *hnd)
MPP_RET hal_task_can_put(HalTaskGroup group)
{
if (NULL == group || NULL == hnd) {
mpp_err_f("found NULL input group %p hnd %d\n", group, hnd);
if (NULL == group) {
mpp_err_f("found NULL input group\n");
return MPP_ERR_NULL_PTR;
}
HalTaskGroupImpl *p = (HalTaskGroupImpl *)group;
Mutex::Autolock auto_lock(p->lock);
struct list_head *head = (used) ? (&p->list_used) : (&p->list_unused);
if (list_empty(head)) {
*hnd = NULL;
return MPP_NOK;
}
*hnd = list_entry(head->next, HalTaskImpl, list);
return MPP_OK;
mpp_list *tasks = p->tasks;
Mutex::Autolock auto_lock(tasks->mutex());
return (tasks->list_size() < p->count) ? (MPP_OK) : (MPP_NOK);
}
MPP_RET hal_task_set_used(HalTaskHnd hnd, RK_U32 used)
MPP_RET hal_task_can_get(HalTaskGroup group)
{
if (NULL == hnd) {
mpp_err_f("found NULL input\n");
if (NULL == group) {
mpp_err_f("found NULL input group\n");
return MPP_ERR_NULL_PTR;
}
HalTaskImpl *impl = (HalTaskImpl *)hnd;
HalTaskGroupImpl *group = impl->group;
Mutex::Autolock auto_lock(group->lock);
struct list_head *head = (used) ? (&group->list_used) : (&group->list_unused);
list_del_init(&impl->list);
list_add_tail(&impl->list, head);
if (impl->used)
group->count_used--;
else
group->count_unused--;
if (used)
group->count_used++;
else
group->count_unused++;
impl->used = used;
return MPP_OK;
HalTaskGroupImpl *p = (HalTaskGroupImpl *)group;
mpp_list *tasks = p->tasks;
Mutex::Autolock auto_lock(tasks->mutex());
return (tasks->list_size()) ? (MPP_OK) : (MPP_NOK);
}
MPP_RET hal_task_get_info(HalTaskHnd hnd, HalTask *task)
MPP_RET hal_task_put(HalTaskGroup group, HalTask *task)
{
HalTaskImpl *impl = (HalTaskImpl *)hnd;
memcpy(task, &impl->task, get_task_size(impl->group));
MPP_RET ret = hal_task_can_put(group);
mpp_assert(ret == MPP_OK);
HalTaskGroupImpl *p = (HalTaskGroupImpl *)group;
mpp_list *tasks = p->tasks;
Mutex::Autolock auto_lock(tasks->mutex());
tasks->add_at_tail(task, sizeof(*task));
p->count_put++;
return MPP_OK;
}
MPP_RET hal_task_set_info(HalTaskHnd hnd, HalTask *task)
MPP_RET hal_task_get(HalTaskGroup group, HalTask *task)
{
HalTaskImpl *impl = (HalTaskImpl *)hnd;
memcpy(&impl->task, task, get_task_size(impl->group));
MPP_RET ret = hal_task_can_get(group);
mpp_assert(ret == MPP_OK);
HalTaskGroupImpl *p = (HalTaskGroupImpl *)group;
mpp_list *tasks = p->tasks;
Mutex::Autolock auto_lock(tasks->mutex());
tasks->del_at_head(task, sizeof(*task));
p->count_get++;
return MPP_OK;
}

View File

@@ -109,6 +109,9 @@ extern "C" {
/*
* group init / deinit will be called by hal
*
* NOTE: use mpp_list to implement
* the count means the max task waiting for process
*/
MPP_RET hal_task_group_init(HalTaskGroup *group, MppCtxType type, RK_U32 count);
MPP_RET hal_task_group_deinit(HalTaskGroup group);
@@ -118,25 +121,21 @@ MPP_RET hal_task_group_deinit(HalTaskGroup group);
*
* dec:
*
* get_hnd(group, 0, &hnd) - dec get a unused handle first
* parser->parse - parser write a local info in dec
* set_info(hnd, info) - dec write the local info to handle
* set_used(hnd, 1) - decoder set handle to used
* hal_task_can_put(group) - dec test whether can send task to hal
* parser->parse(task) - parser write a local task
* hal_task_put(group, task) - dec send the task to hal
*
* hal:
* get_hnd(group, 1, &hnd)
* read_info(hnd, info)
* set_used(hnd, 0)
*
* these calls do not own syntax handle but just get its reference
* so there is not need to free or destory the handle
* hal_task_can_get(group) - hal test whether there is task waiting for process
* hal_task_get(group, task) - hal get the task to process
*
*/
MPP_RET hal_task_get_hnd(HalTaskGroup group, RK_U32 used, HalTaskHnd *hnd);
MPP_RET hal_task_set_used(HalTaskHnd hnd, RK_U32 used);
MPP_RET hal_task_can_put(HalTaskGroup group);
MPP_RET hal_task_can_get(HalTaskGroup group);
MPP_RET hal_task_get_info(HalTaskHnd hnd, HalTask *task);
MPP_RET hal_task_set_info(HalTaskHnd hnd, HalTask *task);
MPP_RET hal_task_put(HalTaskGroup group, HalTask *task);
MPP_RET hal_task_get(HalTaskGroup group, HalTask *task);
#ifdef __cplusplus
}

View File

@@ -154,7 +154,7 @@ void Mpp::clear()
MPP_RET Mpp::put_packet(MppPacket packet)
{
Mutex::Autolock autoLock(&mPacketLock);
Mutex::Autolock autoLock(mPackets->mutex());
if (mPackets->list_size() < 4) {
mPackets->add_at_tail(packet, sizeof(MppPacketImpl));
mPacketPutCount++;
@@ -166,7 +166,7 @@ MPP_RET Mpp::put_packet(MppPacket packet)
MPP_RET Mpp::get_frame(MppFrame *frame)
{
Mutex::Autolock autoLock(&mFrameLock);
Mutex::Autolock autoLock(mFrames->mutex());
if (mFrames->list_size()) {
mFrames->del_at_tail(frame, sizeof(frame));
mFrameGetCount++;
@@ -177,7 +177,7 @@ MPP_RET Mpp::get_frame(MppFrame *frame)
MPP_RET Mpp::put_frame(MppFrame frame)
{
Mutex::Autolock autoLock(&mFrameLock);
Mutex::Autolock autoLock(mFrames->mutex());
if (mFrames->list_size() < 4) {
mFrames->add_at_tail(frame, sizeof(MppFrameImpl));
mThreadCodec->signal();
@@ -189,7 +189,7 @@ MPP_RET Mpp::put_frame(MppFrame frame)
MPP_RET Mpp::get_packet(MppPacket *packet)
{
Mutex::Autolock autoLock(&mPacketLock);
Mutex::Autolock autoLock(mPackets->mutex());
if (mPackets->list_size()) {
mPackets->del_at_tail(packet, sizeof(packet));
mPacketGetCount++;

View File

@@ -68,10 +68,6 @@ public:
MPP_RET get_packet(MppPacket *packet);
Mutex mPacketLock;
Mutex mFrameLock;
Mutex mTaskLock;
mpp_list *mPackets;
mpp_list *mFrames;
mpp_list *mTasks;

View File

@@ -59,12 +59,16 @@ public:
RK_S32 flush();
RK_S32 lock();
RK_S32 unlock();
// open lock function for external combination usage
void lock();
void unlock();
RK_S32 trylock();
// open lock function for external auto lock
Mutex *mutex();
private:
pthread_mutex_t mutex;
Mutex mMutex;
node_destructor destroy;
struct mpp_list_node *head;
RK_S32 count;

View File

@@ -83,7 +83,6 @@ static inline void mpp_list_add_tail(mpp_list_node *_new, mpp_list_node *head)
RK_S32 mpp_list::add_at_head(void *data, RK_S32 size)
{
RK_S32 ret = -EINVAL;
pthread_mutex_lock(&mutex);
if (head) {
mpp_list_node *node = create_list(data, size, 0);
if (node) {
@@ -94,14 +93,12 @@ RK_S32 mpp_list::add_at_head(void *data, RK_S32 size)
ret = -ENOMEM;
}
}
pthread_mutex_unlock(&mutex);
return ret;
}
RK_S32 mpp_list::add_at_tail(void *data, RK_S32 size)
{
RK_S32 ret = -EINVAL;
pthread_mutex_lock(&mutex);
if (head) {
mpp_list_node *node = create_list(data, size, 0);
if (node) {
@@ -112,7 +109,6 @@ RK_S32 mpp_list::add_at_tail(void *data, RK_S32 size)
ret = -ENOMEM;
}
}
pthread_mutex_unlock(&mutex);
return ret;
}
@@ -155,50 +151,40 @@ static inline void _list_del_node_no_lock(mpp_list_node *node, void *data, RK_S3
RK_S32 mpp_list::del_at_head(void *data, RK_S32 size)
{
RK_S32 ret = -EINVAL;
pthread_mutex_lock(&mutex);
if (head && count) {
_list_del_node_no_lock(head->next, data, size);
count--;
ret = 0;
}
pthread_mutex_unlock(&mutex);
return ret;
}
RK_S32 mpp_list::del_at_tail(void *data, RK_S32 size)
{
RK_S32 ret = -EINVAL;
pthread_mutex_lock(&mutex);
if (head && count) {
_list_del_node_no_lock(head->prev, data, size);
count--;
pthread_mutex_unlock(&mutex);
ret = 0;
}
pthread_mutex_unlock(&mutex);
return ret;
}
RK_S32 mpp_list::list_is_empty()
{
pthread_mutex_lock(&mutex);
RK_S32 ret = (count == 0);
pthread_mutex_unlock(&mutex);
return ret;
}
RK_S32 mpp_list::list_size()
{
pthread_mutex_lock(&mutex);
RK_S32 ret = count;
pthread_mutex_unlock(&mutex);
return ret;
}
RK_S32 mpp_list::add_by_key(void *data, RK_S32 size, RK_U32 *key)
{
RK_S32 ret = 0;
pthread_mutex_lock(&mutex);
if (head) {
RK_U32 list_key = get_key();
*key = list_key;
@@ -211,14 +197,12 @@ RK_S32 mpp_list::add_by_key(void *data, RK_S32 size, RK_U32 *key)
ret = -ENOMEM;
}
}
pthread_mutex_unlock(&mutex);
return ret;
}
RK_S32 mpp_list::del_by_key(void *data, RK_S32 size, RK_U32 key)
{
RK_S32 ret = 0;
pthread_mutex_lock(&mutex);
if (head && count) {
struct mpp_list_node *tmp = head->next;
ret = -EINVAL;
@@ -230,7 +214,6 @@ RK_S32 mpp_list::del_by_key(void *data, RK_S32 size, RK_U32 key)
}
}
}
pthread_mutex_unlock(&mutex);
return ret;
}
@@ -245,7 +228,6 @@ RK_S32 mpp_list::show_by_key(void *data, RK_U32 key)
RK_S32 mpp_list::flush()
{
pthread_mutex_lock(&mutex);
if (head) {
while (count) {
mpp_list_node* node = head->next;
@@ -257,23 +239,27 @@ RK_S32 mpp_list::flush()
count--;
}
}
pthread_mutex_unlock(&mutex);
return 0;
}
RK_S32 mpp_list::lock()
void mpp_list::lock()
{
return pthread_mutex_lock(&mutex);
mMutex.lock();
}
RK_S32 mpp_list::unlock()
void mpp_list::unlock()
{
return pthread_mutex_unlock(&mutex);
mMutex.unlock();
}
RK_S32 mpp_list::trylock()
{
return pthread_mutex_trylock(&mutex);
return mMutex.trylock();
}
Mutex *mpp_list::mutex()
{
return &mMutex;
}
RK_U32 mpp_list::get_key()
@@ -286,11 +272,6 @@ mpp_list::mpp_list(node_destructor func)
head(NULL),
count(0)
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mutex, &attr);
pthread_mutexattr_destroy(&attr);
destroy = func;
head = (mpp_list_node*)malloc(sizeof(mpp_list_node));
if (NULL == head) {
@@ -306,7 +287,6 @@ mpp_list::~mpp_list()
if (head) free(head);
head = NULL;
destroy = NULL;
pthread_mutex_destroy(&mutex);
}
#if BUILD_RK_LIST_TEST