diff --git a/inc/mpp_task.h b/inc/mpp_task.h index 8adb7705..8effcfc5 100644 --- a/inc/mpp_task.h +++ b/inc/mpp_task.h @@ -42,7 +42,14 @@ */ /* - * mpp has two ports: input and output + * One mpp task queue has two ports: input and output + * + * The whole picture is: + * Top layer mpp has two ports: mpp_input_port and mpp_output_port + * But internally these two ports belongs to two task queue. + * The mpp_input_port is the mpp_input_task_queue's input port. + * The mpp_output_port is the mpp_output_task_queue's output port. + * * Each port uses its task queue to communication */ typedef enum { @@ -175,11 +182,46 @@ typedef enum { */ /* NOTE: use index rather then handle to descripbe task */ typedef void* MppTask; +typedef void* MppPort; +typedef void* MppTaskQueue; #ifdef __cplusplus extern "C" { #endif +/* + * Mpp task queue function: + * + * mpp_task_queue_init - create task queue structure + * mpp_task_queue_deinit - destory task queue structure + * mpp_task_queue_get_port - return input or output port of task queue + * + * Typical work flow, task mpp_dec for example: + * + * 1. Mpp layer creates one task queue in order to connect mpp input and mpp_dec input. + * 2. Mpp layer setups the task count in task queue input port. + * 3. Get input port from the task queue and assign to mpp input as mpp_input_port. + * 4. Get output port from the task queue and assign to mpp_dec input as dec_input_port. + * 5. Let the loop start. + * a. mpi user will dequeue task from mpp_input_port. + * b. mpi user will setup task. + * c. mpi user will enqueue task back to mpp_input_port. + * d. task will automatically transfer to dec_input_port. + * e. mpp_dec will dequeue task from dec_input_port. + * f. mpp_dec will process task. + * g. mpp_dec will enqueue task back to dec_input_port. + * h. task will automatically transfer to mpp_input_port. + * 6. Stop the loop. All tasks must be return to input port with idle status. + * 6. Mpp layer destory the task queue. + */ +MPP_RET mpp_task_queue_init(MppTaskQueue *queue); +MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count); +MPP_RET mpp_task_queue_deinit(MppTaskQueue queue); +MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type); + +MPP_RET mpp_port_dequeue(MppPort port, MppTask *task); +MPP_RET mpp_port_enqueue(MppPort port, MppTask task); + MPP_RET mpp_task_meta_set_s32(MppTask task, MppMetaKey key, RK_S32 val); MPP_RET mpp_task_meta_set_s64(MppTask task, MppMetaKey key, RK_S64 val); MPP_RET mpp_task_meta_set_ptr(MppTask task, MppMetaKey key, void *val); diff --git a/mpp/base/inc/mpp_task_impl.h b/mpp/base/inc/mpp_task_impl.h index 6fe60670..24633b9f 100644 --- a/mpp/base/inc/mpp_task_impl.h +++ b/mpp/base/inc/mpp_task_impl.h @@ -52,19 +52,17 @@ * */ typedef enum MppTaskStatus_e { - MPP_EXTERNAL_QUEUE, /* in external queue and ready for external dequeue */ - MPP_EXTERNAL_HOLD, /* dequeued and hold by external user, user will config */ - MPP_INTERNAL_QUEUE, /* in mpp internal work queue and ready for mpp dequeue */ - MPP_INTERNAL_HOLD, /* dequeued and hold by mpp internal worker, mpp is processing */ - MPP_TASK_BUTT, + MPP_INPUT_PORT, /* in external queue and ready for external dequeue */ + MPP_INPUT_HOLD, /* dequeued and hold by external user, user will config */ + MPP_OUTPUT_PORT, /* in mpp internal work queue and ready for mpp dequeue */ + MPP_OUTPUT_HOLD, /* dequeued and hold by mpp internal worker, mpp is processing */ + MPP_TASK_STATUS_BUTT, } MppTaskStatus; -typedef void* MppPort; - typedef struct MppTaskImpl_t { const char *name; struct list_head list; - MppPort *port; + MppTaskQueue *queue; RK_S32 index; MppTaskStatus status; @@ -77,22 +75,6 @@ extern "C" { MPP_RET check_mpp_task_name(MppTask task); -/* - * mpp_port_init: - * initialize port with task count and initial status - * group - return port pointer - * type - initial queue for all tasks - * task_count - total task count for this task group - */ -MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count); -MPP_RET mpp_port_deinit(MppPort port); - -MPP_RET mpp_port_can_dequeue(MppPort port); -MPP_RET mpp_port_can_enqueue(MppPort port); - -MPP_RET mpp_port_dequeue(MppPort port, MppTask *task); -MPP_RET mpp_port_enqueue(MppPort port, MppTask task); - #ifdef __cplusplus } #endif diff --git a/mpp/base/mpp_task_impl.cpp b/mpp/base/mpp_task_impl.cpp index d49b9877..33911c73 100644 --- a/mpp/base/mpp_task_impl.cpp +++ b/mpp/base/mpp_task_impl.cpp @@ -25,15 +25,32 @@ #define MAX_TASK_COUNT 8 -typedef struct MppTaskGroupImpl_t { - MppPortType type; - RK_S32 task_count; +typedef struct MppTaskStatusInfo_t { + struct list_head list; + RK_S32 count; + MppTaskStatus status; +} MppTaskStatusInfo; + +typedef struct MppTaskQueueImpl_t { Mutex *lock; + RK_S32 task_count; + + // two ports inside of task queue + MppPort input; + MppPort output; MppTaskImpl *tasks; - struct list_head list[MPP_TASK_BUTT]; - RK_S32 count[MPP_TASK_BUTT]; + MppTaskStatusInfo info[MPP_TASK_STATUS_BUTT]; +} MppTaskQueueImpl; + +typedef struct MppPortImpl_t { + MppPortType type; + MppTaskQueueImpl *queue; + + MppTaskStatus status_curr; + MppTaskStatus next_on_dequeue; + MppTaskStatus next_on_enqueue; } MppPortImpl; static const char *module_name = MODULE_TAG; @@ -53,22 +70,109 @@ MPP_RET check_mpp_task_name(MppTask task) return MPP_NOK; } -MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count) +static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port) { - if (NULL == port || type >= MPP_PORT_BUTT || task_count > MAX_TASK_COUNT) { - mpp_err_f("invalid input port %p type %d count %d\n", port, type, task_count); - return MPP_ERR_UNKNOW; + MppPortImpl *impl = mpp_malloc(MppPortImpl, 1); + if (NULL == impl) { + mpp_err_f("failed to malloc MppPort type %d\n", type); + return MPP_ERR_MALLOC; } - MppPortImpl *p = NULL; + impl->type = type; + impl->queue = queue; + + if (MPP_PORT_INPUT == type) { + impl->status_curr = MPP_INPUT_PORT; + impl->next_on_dequeue = MPP_INPUT_HOLD; + impl->next_on_enqueue = MPP_OUTPUT_PORT; + } else { + impl->status_curr = MPP_OUTPUT_PORT; + impl->next_on_dequeue = MPP_OUTPUT_HOLD; + impl->next_on_enqueue = MPP_INPUT_PORT; + } + + *port = (MppPort *)impl; + + return MPP_OK; +} + +static MPP_RET mpp_port_deinit(MppPort port) +{ + mpp_free(port); + return MPP_OK; +} + +MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) +{ + MppPortImpl *port_impl = (MppPortImpl *)port; + MppTaskQueueImpl *queue = port_impl->queue; + + AutoMutex auto_lock(queue->lock); + MppTaskStatusInfo *curr = &queue->info[port_impl->status_curr]; + MppTaskStatusInfo *next = &queue->info[port_impl->next_on_dequeue]; + + *task = NULL; + if (curr->count == 0) { + mpp_assert(list_empty(&curr->list)); + return MPP_OK; + } + + MppTaskImpl *task_impl = list_entry(curr->list.next, MppTaskImpl, list); + MppTask p = (MppTask)task_impl; + check_mpp_task_name(p); + list_del_init(&task_impl->list); + curr->count--; + mpp_assert(curr->count >= 0); + + list_add_tail(&task_impl->list, &next->list); + next->count++; + task_impl->status = next->status; + + *task = p; + + return MPP_OK; +} + +MPP_RET mpp_port_enqueue(MppPort port, MppTask task) +{ + MppTaskImpl *task_impl = (MppTaskImpl *)task; + MppPortImpl *port_impl = (MppPortImpl *)port; + MppTaskQueueImpl *queue = port_impl->queue; + check_mpp_task_name(task); + + mpp_assert(task_impl->queue == (MppTaskQueue *)queue); + mpp_assert(task_impl->status == port_impl->next_on_dequeue); + + AutoMutex auto_lock(queue->lock); + MppTaskStatusInfo *curr = &queue->info[task_impl->status]; + MppTaskStatusInfo *next = &queue->info[port_impl->next_on_enqueue]; + + list_del_init(&task_impl->list); + curr->count--; + list_add_tail(&task_impl->list, &next->list); + next->count++; + task_impl->status = next->status; + + return MPP_OK; +} + +MPP_RET mpp_task_queue_init(MppTaskQueue *queue) +{ + if (NULL == queue) { + mpp_err_f("invalid NULL input\n"); + return MPP_ERR_NULL_PTR; + } + + MppTaskQueueImpl *p = NULL; MppTaskImpl *tasks = NULL; Mutex *lock = NULL; - MppTaskStatus status = (type == MPP_PORT_INPUT) ? (MPP_EXTERNAL_HOLD) : (MPP_INTERNAL_HOLD); do { - p = mpp_calloc(MppPortImpl, 1); + RK_S32 i; + + p = mpp_calloc(MppTaskQueueImpl, 1); if (NULL == p) { - mpp_err_f("malloc port failed\n"); + mpp_err_f("malloc queue failed\n"); break; } lock = new Mutex(); @@ -76,30 +180,25 @@ MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count) mpp_err_f("new lock failed\n"); break;; } - tasks = mpp_calloc(MppTaskImpl, task_count); - if (NULL == tasks) { - mpp_err_f("malloc tasks list failed\n"); - break;; + + for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) { + INIT_LIST_HEAD(&p->info[i].list); + p->info[i].count = 0; + p->info[i].status = (MppTaskStatus)i; } - p->type = type; - p->task_count = task_count; p->lock = lock; p->tasks = tasks; - for (RK_U32 i = 0; i < MPP_TASK_BUTT; i++) - INIT_LIST_HEAD(&p->list[i]); + if (mpp_port_init(p, MPP_PORT_INPUT, &p->input)) + break; - for (RK_S32 i = 0; i < task_count; i++) { - setup_mpp_task_name(&tasks[i]); - INIT_LIST_HEAD(&tasks[i].list); - tasks[i].index = i; - tasks[i].port = port; - tasks[i].status = status; - list_add_tail(&tasks[i].list, &p->list[status]); - p->count[status]++; + if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) { + mpp_port_deinit(p->input); + break; } - *port = p; + + *queue = p; return MPP_OK; } while (0); @@ -110,133 +209,79 @@ MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count) if (tasks) mpp_free(tasks); - *port = NULL; + *queue = NULL; return MPP_NOK; } -MPP_RET mpp_port_deinit(MppPort port) +MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count) { - if (NULL == port) { - mpp_err_f("found NULL input port\n"); + MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue; + AutoMutex auto_lock(impl->lock); + + // NOTE: queue can only be setup once + mpp_assert(impl->tasks == NULL); + mpp_assert(impl->task_count == 0); + MppTaskImpl *tasks = mpp_calloc(MppTaskImpl, task_count); + if (NULL == tasks) { + mpp_err_f("malloc tasks list failed\n"); + return MPP_ERR_MALLOC; + } + + impl->tasks = tasks; + impl->task_count = task_count; + + MppTaskStatusInfo *info = &impl->info[MPP_INPUT_PORT]; + + for (RK_S32 i = 0; i < task_count; i++) { + setup_mpp_task_name(&tasks[i]); + INIT_LIST_HEAD(&tasks[i].list); + tasks[i].index = i; + tasks[i].queue = (MppTaskQueue *)queue; + tasks[i].status = MPP_INPUT_PORT; + mpp_meta_get(&tasks[i].meta); + + list_add_tail(&tasks[i].list, &info->list); + info->count++; + } + return MPP_OK; +} + +MPP_RET mpp_task_queue_deinit(MppTaskQueue queue) +{ + if (NULL == queue) { + mpp_err_f("found NULL input queue\n"); return MPP_ERR_NULL_PTR; } - MppPortImpl *p = (MppPortImpl *)port; - if (p->tasks) + MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue; + if (p->input) { + mpp_port_deinit(p->input); + p->input = NULL; + } + if (p->output) { + mpp_port_deinit(p->output); + p->output = NULL; + } + if (p->tasks) { + for (RK_S32 i = 0; i < p->task_count; i++) { + mpp_meta_put(p->tasks[i].meta); + } mpp_free(p->tasks); + } if (p->lock) delete p->lock; mpp_free(p); return MPP_OK; } -MPP_RET mpp_port_can_dequeue(MppPort port) +MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type) { - if (NULL == port) { - mpp_err_f("invalid input port %p\n", port); - return MPP_ERR_NULL_PTR; - } - - MppPortImpl *p = (MppPortImpl *)port; - AutoMutex auto_lock(p->lock); - MppTaskStatus status_curr; - MppTaskStatus status_next; - - return MPP_OK; -} - -MPP_RET mpp_port_can_enqueue(MppPort port) -{ - return MPP_OK; -} - -static MppTaskImpl* mpp_task_get_by_status(MppPortImpl *p, MppTaskStatus status) -{ - struct list_head *list = &p->list[status]; - if (list_empty(list)) + if (NULL == queue || type >= MPP_PORT_BUTT) { + mpp_err_f("invalid input queue %p type %d\n", queue, type); return NULL; - - MppTaskImpl *task = list_entry(list->next, MppTaskImpl, list); - mpp_assert(task->status == status); - list_del_init(&task->list); - return task; -} - -static MPP_RET mpp_task_put_by_status(MppPortImpl *p, MppTaskStatus status, MppTaskImpl* task) -{ - task->status = status; - list_add_tail(&task->list, &p->list[status]); - return MPP_OK; -} - -MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) -{ - if (NULL == port || NULL == task) { - mpp_err_f("invalid input port %p task %d\n", port, task); - return MPP_ERR_UNKNOW; } - MppPortImpl *p = (MppPortImpl *)port; - AutoMutex auto_lock(p->lock); - MppTaskStatus status_curr; - MppTaskStatus status_next; - - switch (p->type) { - case MPP_PORT_INPUT : { - status_curr = MPP_EXTERNAL_QUEUE; - status_next = MPP_EXTERNAL_HOLD; - } break; - case MPP_PORT_OUTPUT : { - status_curr = MPP_INTERNAL_QUEUE; - status_next = MPP_INTERNAL_HOLD; - } break; - default : { - mpp_err_f("invalid queue type: %d\n", p->type); - return MPP_NOK; - } break; - } - - MppTaskImpl *task_op = mpp_task_get_by_status(p, status_curr); - if (NULL == task_op) - return MPP_NOK; - - mpp_task_put_by_status(p, status_next, task_op); - *task = (MppTask)task_op; - return MPP_OK; -} - -MPP_RET mpp_port_enqueue(MppPort port, MppTask task) -{ - if (NULL == port || NULL == task) { - mpp_err_f("invalid input port %p task %d\n", port, task); - return MPP_ERR_UNKNOW; - } - - MppPortImpl *p = (MppPortImpl *)port; - AutoMutex auto_lock(p->lock); - MppTaskStatus status_curr; - MppTaskStatus status_next; - - switch (p->type) { - case MPP_PORT_INPUT : { - status_curr = MPP_INTERNAL_HOLD; - status_next = MPP_EXTERNAL_QUEUE; - } break; - case MPP_PORT_OUTPUT : { - status_curr = MPP_EXTERNAL_HOLD; - status_next = MPP_INTERNAL_QUEUE; - } break; - default : { - mpp_err_f("invalid queue type: %d\n", p->type); - return MPP_NOK; - } break; - } - - MppTaskImpl *task_op = (MppTaskImpl *)task; - mpp_assert(task_op->status == status_curr); - list_del_init(&task_op->list); - - mpp_task_put_by_status(p, status_next, task_op); - return MPP_OK; + MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue; + return (type == MPP_PORT_INPUT) ? (impl->input) : (impl->output); } diff --git a/mpp/codec/inc/mpp_enc.h b/mpp/codec/inc/mpp_enc.h index f863ec2e..a97af8cd 100644 --- a/mpp/codec/inc/mpp_enc.h +++ b/mpp/codec/inc/mpp_enc.h @@ -17,12 +17,19 @@ #ifndef __MPP_ENC_H__ #define __MPP_ENC_H__ -#include "rk_mpi.h" +#include "mpp_hal.h" typedef struct MppEnc_t MppEnc; struct MppEnc_t { MppCodingType coding; + + MppHal hal; + + // common resource + MppBufSlots frame_slots; + MppBufSlots packet_slots; + HalTaskGroup tasks; }; #ifdef __cplusplus diff --git a/mpp/codec/mpp_enc.cpp b/mpp/codec/mpp_enc.cpp index c3550ace..c9300132 100644 --- a/mpp/codec/mpp_enc.cpp +++ b/mpp/codec/mpp_enc.cpp @@ -28,27 +28,43 @@ void *mpp_enc_control_thread(void *data) { Mpp *mpp = (Mpp*)data; - MppThread *thd_enc = mpp->mThreadCodec; - mpp_list *packets = mpp->mPackets; - mpp_list *frames = mpp->mFrames; - MppFrameImpl frame; - MppPacket packet; - size_t size = SZ_1M; - char *buf = mpp_malloc(char, size); + MppThread *thd_enc = mpp->mThreadCodec; + MppPort input = mpp_task_queue_get_port(mpp->mInputTaskQueue, MPP_PORT_OUTPUT); + MppPort output = mpp_task_queue_get_port(mpp->mOutputTaskQueue, MPP_PORT_INPUT); + MppTask task = NULL; + MPP_RET ret = MPP_OK; while (MPP_THREAD_RUNNING == thd_enc->get_status()) { - AutoMutex auto_lock(frames->mutex()); - if (frames->list_size()) { - frames->del_at_head(&frame, sizeof(frame)); + thd_enc->lock(); + ret = mpp_port_dequeue(input, &task); + if (ret || NULL == task) + thd_enc->wait(); + thd_enc->unlock(); - mpp_packet_init(&packet, buf, size); - packets->lock(); - packets->add_at_tail(&packet, sizeof(packet)); - packets->signal(); - packets->unlock(); + if (task) { + MppFrame frame = NULL; + MppPacket packet = NULL; + // task process here + + + // enqueue task back to input input + mpp_task_meta_get_frame (task, MPP_META_KEY_INPUT_FRM, &frame); + mpp_task_meta_get_packet(task, MPP_META_KEY_OUTPUT_PKT, &packet); + + mpp_port_enqueue(input, task); + task = NULL; + + // send finished task to output port + mpp_port_dequeue(output, &task); + + mpp_task_meta_set_frame(task, MPP_META_KEY_INPUT_FRM, frame); + mpp_task_meta_set_packet(task, MPP_META_KEY_OUTPUT_PKT, packet); + + // setup output task here + mpp_port_enqueue(output, task); + task = NULL; } } - mpp_free(buf); return NULL; } @@ -123,7 +139,29 @@ MPP_RET mpp_enc_init(MppEnc **enc, MppCodingType coding) return MPP_ERR_NULL_PTR; } + MPP_RET ret = MPP_NOK; + MppHal hal = NULL; + + MppHalCfg hal_cfg = { + MPP_CTX_ENC, + coding, + HAL_MODE_LIBVPU, + HAL_RKVDEC, + NULL, + NULL, + NULL, + 2, + 0, + NULL, + }; + + ret = mpp_hal_init(&hal, &hal_cfg); + if (ret) { + mpp_err_f("could not init hal\n"); + } + p->coding = coding; + *enc = p; return MPP_OK; @@ -136,6 +174,10 @@ MPP_RET mpp_enc_deinit(MppEnc *enc) return MPP_ERR_NULL_PTR; } + MPP_RET ret = mpp_hal_deinit(enc->hal); + if (ret) { + mpp_err_f("mpp enc hal deinit failed\n"); + } mpp_free(enc); return MPP_OK; } diff --git a/mpp/mpi.cpp b/mpp/mpi.cpp index dbc9aa9e..2fc3f6a1 100644 --- a/mpp/mpi.cpp +++ b/mpp/mpi.cpp @@ -43,6 +43,7 @@ static MppCodingTypeInfo support_list[] = { { MPP_CTX_DEC, MPP_VIDEO_CodingVP8, "dec", "vp8", }, { MPP_CTX_DEC, MPP_VIDEO_CodingVP9, "dec", "VP9", }, { MPP_CTX_DEC, MPP_VIDEO_CodingAVS, "dec", "avs+", }, + { MPP_CTX_ENC, MPP_VIDEO_CodingAVC, "enc", "h.264/AVC", }, }; #define check_mpp_ctx(ctx) _check_mpp_ctx(ctx, __FUNCTION__) @@ -222,6 +223,8 @@ static MPP_RET mpi_dequeue(MppCtx ctx, MppPortType type, MppTask *task) return MPP_ERR_UNKNOW; } + ret = p->ctx->dequeue(type, task); + MPI_FUNCTION_LEAVE(); return ret; } @@ -235,11 +238,13 @@ static MPP_RET mpi_enqueue(MppCtx ctx, MppPortType type, MppTask task) if (ret) return ret; - if (type >= MPP_PORT_BUTT || task < 0) { - mpp_err_f("invalid input type %d task %d\n", type, task); + if (type >= MPP_PORT_BUTT || NULL == task) { + mpp_err_f("invalid input type %d task %p\n", type, task); return MPP_ERR_UNKNOW; } + ret = p->ctx->enqueue(type, task); + MPI_FUNCTION_LEAVE(); return ret; } diff --git a/mpp/mpp.cpp b/mpp/mpp.cpp index 1c8485d3..7830cd31 100644 --- a/mpp/mpp.cpp +++ b/mpp/mpp.cpp @@ -25,9 +25,9 @@ #include "mpp_dec.h" #include "mpp_enc.h" #include "mpp_hal.h" +#include "mpp_task_impl.h" #include "mpp_buffer_impl.h" #include "mpp_frame_impl.h" -#include "mpp_packet.h" #include "mpp_packet_impl.h" #define MPP_TEST_FRAME_SIZE SZ_1M @@ -45,6 +45,10 @@ Mpp::Mpp() mTaskGetCount(0), mPacketGroup(NULL), mFrameGroup(NULL), + mInputPort(NULL), + mOutputPort(NULL), + mInputTaskQueue(NULL), + mOutputTaskQueue(NULL), mThreadCodec(NULL), mThreadHal(NULL), mDec(NULL), @@ -91,7 +95,6 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding) mpp_buffer_group_get_internal(&mPacketGroup, MPP_BUFFER_TYPE_ION); mpp_buffer_group_limit_config(mPacketGroup, 0, 3); - } break; case MPP_CTX_ENC : { mFrames = new mpp_list((node_destructor)NULL); @@ -110,6 +113,14 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding) } break; } + mpp_task_queue_init(&mInputTaskQueue); + mpp_task_queue_init(&mOutputTaskQueue); + mpp_task_queue_setup(mInputTaskQueue, 4); + mpp_task_queue_setup(mOutputTaskQueue, 4); + + mInputPort = mpp_task_queue_get_port(mInputTaskQueue, MPP_PORT_INPUT); + mOutputPort = mpp_task_queue_get_port(mOutputTaskQueue, MPP_PORT_OUTPUT); + if (mFrames && mPackets && (mDec || mEnc) && mThreadCodec && mThreadHal && @@ -150,6 +161,19 @@ void Mpp::clear() delete mThreadHal; mThreadHal = NULL; } + + if (mInputTaskQueue) { + mpp_task_queue_deinit(mInputTaskQueue); + mInputTaskQueue = NULL; + } + if (mOutputTaskQueue) { + mpp_task_queue_deinit(mOutputTaskQueue); + mOutputTaskQueue = NULL; + } + + mInputPort = NULL; + mOutputPort = NULL; + if (mDec || mEnc) { if (mType == MPP_CTX_DEC) { mpp_dec_deinit(mDec); @@ -273,6 +297,63 @@ MPP_RET Mpp::get_packet(MppPacket *packet) return MPP_OK; } +MPP_RET Mpp::dequeue(MppPortType type, MppTask *task) +{ + if (!mInitDone) + return MPP_NOK; + + MPP_RET ret = MPP_NOK; + AutoMutex autoLock(mPortLock); + MppTaskQueue port = NULL; + switch (type) { + case MPP_PORT_INPUT : { + port = mInputPort; + } break; + case MPP_PORT_OUTPUT : { + port = mOutputPort; + } break; + default : { + } break; + } + + if (port) + ret = mpp_port_dequeue(port, task); + + return ret; +} + +MPP_RET Mpp::enqueue(MppPortType type, MppTask task) +{ + if (!mInitDone) + return MPP_NOK; + + MPP_RET ret = MPP_NOK; + AutoMutex autoLock(mPortLock); + MppTaskQueue port = NULL; + switch (type) { + case MPP_PORT_INPUT : { + port = mInputPort; + } break; + case MPP_PORT_OUTPUT : { + port = mOutputPort; + } break; + default : { + } break; + } + + if (port) { + mThreadCodec->lock(); + ret = mpp_port_enqueue(port, task); + if (MPP_OK == ret) { + // if enqueue success wait up thread + mThreadCodec->signal(); + } + mThreadCodec->unlock(); + } + + return ret; +} + MPP_RET Mpp::control(MpiCmd cmd, MppParam param) { switch (cmd) { diff --git a/mpp/mpp.h b/mpp/mpp.h index 8a5c794f..069929e2 100644 --- a/mpp/mpp.h +++ b/mpp/mpp.h @@ -20,6 +20,7 @@ #include "mpp_list.h" #include "mpp_dec.h" #include "mpp_enc.h" +#include "mpp_task.h" #define MPP_DBG_FUNCTION (0x00000001) #define MPP_DBG_PACKET (0x00000002) @@ -70,6 +71,9 @@ public: MPP_RET put_frame(MppFrame frame); MPP_RET get_packet(MppPacket *packet); + MPP_RET dequeue(MppPortType type, MppTask *task); + MPP_RET enqueue(MppPortType type, MppTask task); + MPP_RET reset(); MPP_RET control(MpiCmd cmd, MppParam param); @@ -94,6 +98,16 @@ public: MppBufferGroup mPacketGroup; MppBufferGroup mFrameGroup; + /* + * Mpp task queue for advance task mode + */ + Mutex mPortLock; + MppPort mInputPort; + MppPort mOutputPort; + + MppTaskQueue mInputTaskQueue; + MppTaskQueue mOutputTaskQueue; + /* * There are two threads for each decoder/encoder: codec thread and hal thread * diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 784b3387..aa796231 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,3 +40,6 @@ add_mpp_test(mpi) # mpi decoder unit test add_mpp_test(mpi_dec) + +# mpi encoder unit test +add_mpp_test(mpi_enc) diff --git a/test/mpi_enc_test.c b/test/mpi_enc_test.c index 5edadcce..b4eb4e43 100644 --- a/test/mpi_enc_test.c +++ b/test/mpi_enc_test.c @@ -75,9 +75,6 @@ int mpi_enc_test(MpiEncTestCmd *cmd) MppBuffer frm_buf[MPI_ENC_IO_COUNT] = { NULL }; MppBuffer pkt_buf[MPI_ENC_IO_COUNT] = { NULL }; - MpiCmd mpi_cmd = MPP_CMD_BASE; - MppParam param = NULL; - // paramter for resource malloc RK_U32 width = cmd->width; RK_U32 height = cmd->height; @@ -162,6 +159,7 @@ int mpi_enc_test(MpiEncTestCmd *cmd) mpp_frame_set_hor_stride(frame, hor_stride); mpp_frame_set_ver_stride(frame, ver_stride); + i = 0; while (!pkt_eos) { MppTask task = NULL; RK_S32 index = i++; @@ -176,12 +174,13 @@ int mpi_enc_test(MpiEncTestCmd *cmd) if (read_size != frame_size || feof(fp_input)) { mpp_log("found last frame\n"); frm_eos = 1; + break; } mpp_frame_set_buffer(frame, frm_buf_in); mpp_frame_set_eos(frame, frm_eos); - mpp_packet_init_with_buffer(packet, pkt_buf_out); + mpp_packet_init_with_buffer(&packet, pkt_buf_out); ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task); if (ret) { @@ -198,32 +197,42 @@ int mpi_enc_test(MpiEncTestCmd *cmd) goto MPP_TEST_OUT; } - msleep(200); + msleep(20); - ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); - if (ret) { - mpp_err("mpp task output dequeue failed\n"); - goto MPP_TEST_OUT; - } - - if (task) { - mpp_task_meta_get_frame (task, MPP_META_KEY_INPUT_FRM, &frame); - mpp_task_meta_get_packet(task, MPP_META_KEY_OUTPUT_PKT, &packet); - - // write packet to file here - { - void *ptr = mpp_packet_get_pos(packet); - size_t len = mpp_packet_get_length(packet); - fwrite(ptr, 1, len, fp_output); + do { + ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); + if (ret) { + mpp_err("mpp task output dequeue failed\n"); + goto MPP_TEST_OUT; } - mpp_packet_deinit(&packet); - } - ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); - if (ret) { - mpp_err("mpp task output enqueue failed\n"); - goto MPP_TEST_OUT; - } + if (task) { + MppFrame frame_out = NULL; + MppFrame packet_out = NULL; + + mpp_task_meta_get_frame (task, MPP_META_KEY_INPUT_FRM, &frame_out); + mpp_task_meta_get_packet(task, MPP_META_KEY_OUTPUT_PKT, &packet_out); + + mpp_assert(packet_out == packet); + mpp_assert(frame_out == frame); + if (packet) { + // write packet to file here + // void *ptr = mpp_packet_get_pos(packet); + // size_t len = mpp_packet_get_length(packet); + // fwrite(ptr, 1, len, fp_output); + mpp_packet_deinit(&packet); + } + mpp_log_f("encoded frame %d\n", frame_count); + frame_count++; + + ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); + if (ret) { + mpp_err("mpp task output enqueue failed\n"); + goto MPP_TEST_OUT; + } + break; + } + } while (1); } ret = mpi->reset(ctx); @@ -239,14 +248,19 @@ MPP_TEST_OUT: ctx = NULL; } + if (frame) { + mpp_frame_deinit(&frame); + frame = NULL; + } + for (i = 0; i < MPI_ENC_IO_COUNT; i++) { if (frm_buf[i]) { - mpp_buffer_put(&frm_buf[i]); + mpp_buffer_put(frm_buf[i]); frm_buf[i] = NULL; } if (pkt_buf[i]) { - mpp_buffer_put(&pkt_buf[i]); + mpp_buffer_put(pkt_buf[i]); pkt_buf[i] = NULL; } }