diff --git a/inc/mpp_task.h b/inc/mpp_task.h index 7d1a2fea..c91899eb 100644 --- a/inc/mpp_task.h +++ b/inc/mpp_task.h @@ -66,22 +66,25 @@ typedef enum { * mpp_init(type, coding, MPP_WORK_ASYNC) * * input thread - * a - dequeue(input, *task) - * b - task_set_item(packet/frame) - * c - enqueue(input, task) // when enqueue return the task is not done yet + * a - poll(input) + * b - dequeue(input, *task) + * c - task_set_item(packet/frame) + * d - enqueue(input, task) // when enqueue return the task is not done yet * * output thread - * a - dequeue(output, *task) - * b - task_get_item(frame/packet) - * c - enqueue(output, task) + * a - poll(output) + * b - dequeue(output, *task) + * c - task_get_item(frame/packet) + * d - enqueue(output, task) ****************************************************************************** * 2. sync mode * * mpp_init(type, coding, MPP_WORK_SYNC) * - * a - dequeue(input, *task) - * b - task_set_item(packet/frame) - * c - enqueue(task) // when enqueue return the task is finished + * a - poll(input) + * b - dequeue(input, *task) + * c - task_set_item(packet/frame) + * d - enqueue(task) // when enqueue return the task is finished ****************************************************************************** */ typedef enum { @@ -208,48 +211,11 @@ typedef enum { * task_enqueue(ctx, PORT_OUTPUT, task); */ /* NOTE: use index rather then handle to descripbe task */ -typedef void* MppPort; -typedef void* MppTaskQueue; #ifdef __cplusplus extern "C" { #endif -/* - * Mpp task queue function: - * - * mpp_task_queue_init - create task queue structure - * mpp_task_queue_deinit - destory task queue structure - * mpp_task_queue_get_port - return input or output port of task queue - * - * Typical work flow, task mpp_dec for example: - * - * 1. Mpp layer creates one task queue in order to connect mpp input and mpp_dec input. - * 2. Mpp layer setups the task count in task queue input port. - * 3. Get input port from the task queue and assign to mpp input as mpp_input_port. - * 4. Get output port from the task queue and assign to mpp_dec input as dec_input_port. - * 5. Let the loop start. - * a. mpi user will dequeue task from mpp_input_port. - * b. mpi user will setup task. - * c. mpi user will enqueue task back to mpp_input_port. - * d. task will automatically transfer to dec_input_port. - * e. mpp_dec will dequeue task from dec_input_port. - * f. mpp_dec will process task. - * g. mpp_dec will enqueue task back to dec_input_port. - * h. task will automatically transfer to mpp_input_port. - * 6. Stop the loop. All tasks must be return to input port with idle status. - * 6. Mpp layer destory the task queue. - */ -MPP_RET mpp_task_queue_init(MppTaskQueue *queue); -MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count); -MPP_RET mpp_task_queue_deinit(MppTaskQueue queue); -MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type); - -MPP_RET mpp_port_poll(MppPort port, MppPollType timeout); -MPP_RET mpp_port_dequeue(MppPort port, MppTask *task); -MPP_RET mpp_port_enqueue(MppPort port, MppTask task); -MPP_RET mpp_port_awake(MppPort port); - MPP_RET mpp_task_meta_set_s32(MppTask task, MppMetaKey key, RK_S32 val); MPP_RET mpp_task_meta_set_s64(MppTask task, MppMetaKey key, RK_S64 val); MPP_RET mpp_task_meta_set_ptr(MppTask task, MppMetaKey key, void *val); diff --git a/mpp/base/inc/mpp_task_impl.h b/mpp/base/inc/mpp_task_impl.h index a7648255..3a592cab 100644 --- a/mpp/base/inc/mpp_task_impl.h +++ b/mpp/base/inc/mpp_task_impl.h @@ -20,6 +20,9 @@ #include "mpp_list.h" #include "mpp_task.h" +typedef void* MppPort; +typedef void* MppTaskQueue; + /* * mpp task status transaction * @@ -75,6 +78,46 @@ extern "C" { MPP_RET check_mpp_task_name(MppTask task); +/* + * Mpp task queue function: + * + * mpp_task_queue_init - create task queue structure + * mpp_task_queue_deinit - destory task queue structure + * mpp_task_queue_get_port - return input or output port of task queue + * + * Typical work flow, task mpp_dec for example: + * + * 1. Mpp layer creates one task queue in order to connect mpp input and mpp_dec input. + * 2. Mpp layer setups the task count in task queue input port. + * 3. Get input port from the task queue and assign to mpp input as mpp_input_port. + * 4. Get output port from the task queue and assign to mpp_dec input as dec_input_port. + * 5. Let the loop start. + * a. mpi user will dequeue task from mpp_input_port. + * b. mpi user will setup task. + * c. mpi user will enqueue task back to mpp_input_port. + * d. task will automatically transfer to dec_input_port. + * e. mpp_dec will dequeue task from dec_input_port. + * f. mpp_dec will process task. + * g. mpp_dec will enqueue task back to dec_input_port. + * h. task will automatically transfer to mpp_input_port. + * 6. Stop the loop. All tasks must be return to input port with idle status. + * 6. Mpp layer destory the task queue. + */ +MPP_RET mpp_task_queue_init(MppTaskQueue *queue); +MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count); +MPP_RET mpp_task_queue_deinit(MppTaskQueue queue); +MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type); + +#define mpp_port_poll(port, timeout) _mpp_port_poll(__FUNCTION__, port, timeout) +#define mpp_port_dequeue(port, task) _mpp_port_dequeue(__FUNCTION__, port, task) +#define mpp_port_enqueue(port, task) _mpp_port_enqueue(__FUNCTION__, port, task) +#define mpp_port_awake(port) _mpp_port_awake(__FUNCTION__, port) + +MPP_RET _mpp_port_poll(const char *caller, MppPort port, MppPollType timeout); +MPP_RET _mpp_port_dequeue(const char *caller, MppPort port, MppTask *task); +MPP_RET _mpp_port_enqueue(const char *caller, MppPort port, MppTask task); +MPP_RET _mpp_port_awake(const char *caller, MppPort port); + #ifdef __cplusplus } #endif diff --git a/mpp/base/mpp_task_impl.cpp b/mpp/base/mpp_task_impl.cpp index 82dd79a7..1857efa2 100644 --- a/mpp/base/mpp_task_impl.cpp +++ b/mpp/base/mpp_task_impl.cpp @@ -122,7 +122,7 @@ static MPP_RET mpp_port_deinit(MppPort port) return MPP_OK; } -MPP_RET mpp_port_poll(MppPort port, MppPollType timeout) +MPP_RET _mpp_port_poll(const char *caller, MppPort port, MppPollType timeout) { MppPortImpl *port_impl = (MppPortImpl *)port; MppTaskQueueImpl *queue = port_impl->queue; @@ -131,7 +131,7 @@ MPP_RET mpp_port_poll(MppPort port, MppPollType timeout) MppTaskStatusInfo *curr = NULL; MPP_RET ret = MPP_NOK; - mpp_task_dbg_func("enter port %p timeout %d\n", port, timeout); + mpp_task_dbg_func("caller %s enter port %p timeout %d\n", caller, port, timeout); if (!queue->ready) { mpp_err("try to query when %s queue is not ready\n", (port_impl->type == MPP_PORT_INPUT) ? @@ -172,11 +172,11 @@ MPP_RET mpp_port_poll(MppPort port, MppPollType timeout) } } RET: - mpp_task_dbg_func("leave port %p ret %d\n", port, ret); + mpp_task_dbg_func("caller %s leave port %p ret %d\n", caller, port, ret); return ret; } -MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) +MPP_RET _mpp_port_dequeue(const char *caller, MppPort port, MppTask *task) { MppPortImpl *port_impl = (MppPortImpl *)port; MppTaskQueueImpl *queue = port_impl->queue; @@ -188,7 +188,7 @@ MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) AutoMutex auto_lock(queue->lock); MPP_RET ret = MPP_NOK; - mpp_task_dbg_func("enter port %p\n", port); + mpp_task_dbg_func("caller %s enter port %p\n", caller, port); if (!queue->ready) { mpp_err("try to dequeue when %s queue is not ready\n", @@ -200,6 +200,9 @@ MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) curr = &queue->info[port_impl->status_curr]; next = &queue->info[port_impl->next_on_dequeue]; + mpp_task_dbg_func("move port %p task %p %d -> %d\n", port, task, + port_impl->status_curr, port_impl->next_on_dequeue); + *task = NULL; if (curr->count == 0) { mpp_assert(list_empty(&curr->list)); @@ -221,12 +224,12 @@ MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) *task = p; ret = MPP_OK; RET: - mpp_task_dbg_func("leave port %p ret %d\n", port, ret); + mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, *task, ret); return ret; } -MPP_RET mpp_port_enqueue(MppPort port, MppTask task) +MPP_RET _mpp_port_enqueue(const char *caller, MppPort port, MppTask task) { MppTaskImpl *task_impl = (MppTaskImpl *)task; MppPortImpl *port_impl = (MppPortImpl *)port; @@ -237,7 +240,7 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task) AutoMutex auto_lock(queue->lock); MPP_RET ret = MPP_NOK; - mpp_task_dbg_func("enter port %p\n", port); + mpp_task_dbg_func("caller %s enter port %p task %p\n", caller, port, task); if (!queue->ready) { mpp_err("try to enqueue when %s queue is not ready\n", @@ -254,6 +257,9 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task) curr = &queue->info[task_impl->status]; next = &queue->info[port_impl->next_on_enqueue]; + mpp_task_dbg_func("move port %p task %p %d -> %d\n", port, task, + task_impl->status, port_impl->next_on_enqueue); + list_del_init(&task_impl->list); curr->count--; list_add_tail(&task_impl->list, &next->list); @@ -264,17 +270,17 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task) mpp_task_dbg_func("signal port %p\n", next); ret = MPP_OK; RET: - mpp_task_dbg_func("leave port %p ret %d\n", port, ret); + mpp_task_dbg_func("caller %s leave port %p task %p ret %d\n", caller, port, task, ret); return ret; } -MPP_RET mpp_port_awake(MppPort port) +MPP_RET _mpp_port_awake(const char *caller, MppPort port) { if (port == NULL) return MPP_NOK; - mpp_task_dbg_func("enter port %p\n", port); + mpp_task_dbg_func("caller %s enter port %p\n", caller, port); MppPortImpl *port_impl = (MppPortImpl *)port; MppTaskQueueImpl *queue = port_impl->queue; MppTaskStatusInfo *curr = NULL; @@ -286,7 +292,7 @@ MPP_RET mpp_port_awake(MppPort port) } } - mpp_task_dbg_func("leave port %p\n", port); + mpp_task_dbg_func("caller %s leave port %p\n", caller, port); return MPP_OK; } diff --git a/mpp/inc/mpp.h b/mpp/inc/mpp.h index d467c1a9..03c0b48d 100644 --- a/mpp/inc/mpp.h +++ b/mpp/inc/mpp.h @@ -18,6 +18,7 @@ #define __MPP_H__ #include "mpp_queue.h" +#include "mpp_task_impl.h" #include "mpp_dec.h" #include "mpp_enc.h"