From d8ed1f4eb11c12f00ad066644bf35360bb5dd862 Mon Sep 17 00:00:00 2001 From: Herman Chen Date: Tue, 20 Dec 2016 15:10:17 +0800 Subject: [PATCH] [mpi]: Add poll interface to mpi 1. Add poll with timeout. Support three mode: block/non-block/timeout. 2. Change control MPP_SET_INPUT_BLOCK and MPP_SET_OUTPUT_BLOCK. 3. Remove msleep in most mpp interface. Change-Id: I39d9a261b6f1da66c6cb944abd71d1e7f4928d2d Signed-off-by: Herman Chen --- inc/mpp_task.h | 17 ++- inc/rk_mpi.h | 13 +- mpp/base/mpp_task_impl.cpp | 257 ++++++++++++++++++++++++--------- mpp/codec/mpp_dec.cpp | 7 +- mpp/codec/mpp_enc.cpp | 1 + mpp/legacy/vpu_api_legacy.cpp | 260 ++++++++++++++++------------------ mpp/legacy/vpu_api_legacy.h | 3 - mpp/mpi.cpp | 27 ++++ mpp/mpp.cpp | 187 +++++++++++++----------- mpp/mpp.h | 8 +- test/mpi_dec_test.c | 90 ++++++------ test/mpi_enc_test.c | 114 +++++++-------- test/mpi_rc_test.c | 34 ++--- 13 files changed, 599 insertions(+), 419 deletions(-) diff --git a/inc/mpp_task.h b/inc/mpp_task.h index 4fb83fb5..aa2c8175 100644 --- a/inc/mpp_task.h +++ b/inc/mpp_task.h @@ -90,6 +90,21 @@ typedef enum { MPP_TASK_WORK_MODE_BUTT, } MppTaskWorkMode; +/* + * Mpp port pull type + * + * MPP_POLL_BLOCK - for block poll + * MPP_POLL_NON_BLOCK - for non-block poll + * small than MPP_POLL_MAX - for poll with timeout in ms + * small than MPP_POLL_BUTT or larger than MPP_POLL_MAX is invalid value + */ +typedef enum { + MPP_POLL_BUTT = -2, + MPP_POLL_BLOCK = -1, + MPP_POLL_NON_BLOCK = 0, + MPP_POLL_MAX = 1000, +} MppPollType; + /* * MppTask is descriptor of a task which send to mpp for process * mpp can support different type of work mode, for example: @@ -219,7 +234,7 @@ MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count); MPP_RET mpp_task_queue_deinit(MppTaskQueue queue); MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type); -MPP_RET mpp_port_can_dequeue(MppPort port); +MPP_RET mpp_port_poll(MppPort port, MppPollType timeout); MPP_RET mpp_port_dequeue(MppPort port, MppTask *task); MPP_RET mpp_port_enqueue(MppPort port, MppTask task); diff --git a/inc/rk_mpi.h b/inc/rk_mpi.h index a73d46ac..919ead58 100644 --- a/inc/rk_mpi.h +++ b/inc/rk_mpi.h @@ -229,12 +229,19 @@ typedef struct MppApi_t { MPP_RET (*isp_get_frame)(MppCtx ctx, MppFrame *frame); // advance data flow interface + /** + * @brief poll port for dequeue + * @param ctx The context of mpp + * @param type input port or output port which are both for data transaction + * @return 0 for success there is valid task for dequeue, others for failure + */ + MPP_RET (*poll)(MppCtx ctx, MppPortType type, MppPollType timeout); /** * @brief dequeue MppTask * @param ctx The context of mpp * @param type input port or output port which are both for data transaction * @param task MppTask which is sent to mpp for process - * @return 0 for success, oters for failure + * @return 0 for success, others for failure */ MPP_RET (*dequeue)(MppCtx ctx, MppPortType type, MppTask *task); /** @@ -242,7 +249,7 @@ typedef struct MppApi_t { * @param ctx The context of mpp * @param type input port or output port which are both for data transaction * @param task MppTask which is sent to mpp for process - * @return 0 for success, oters for failure + * @return 0 for success, others for failure */ MPP_RET (*enqueue)(MppCtx ctx, MppPortType type, MppTask task); @@ -258,7 +265,7 @@ typedef struct MppApi_t { * @param ctx The context of mpp * @param cmd The mpi command * @param param The mpi command parameter - * @return 0 for success, oters for failure + * @return 0 for success, others for failure */ MPP_RET (*control)(MppCtx ctx, MpiCmd cmd, MppParam param); diff --git a/mpp/base/mpp_task_impl.cpp b/mpp/base/mpp_task_impl.cpp index 3cb6ad22..8c7dd73d 100644 --- a/mpp/base/mpp_task_impl.cpp +++ b/mpp/base/mpp_task_impl.cpp @@ -18,6 +18,7 @@ #include +#include "mpp_env.h" #include "mpp_log.h" #include "mpp_mem.h" @@ -25,15 +26,25 @@ #define MAX_TASK_COUNT 8 +#define MPP_TASK_DBG_FUNCTION (0x00000001) + +#define mpp_task_dbg(flag, fmt, ...) _mpp_dbg(mpp_task_debug, flag, fmt, ## __VA_ARGS__) +#define mpp_task_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_task_debug, flag, fmt, ## __VA_ARGS__) + +#define mpp_task_dbg_func(fmt, ...) mpp_task_dbg_f(MPP_TASK_DBG_FUNCTION, fmt, ## __VA_ARGS__) + typedef struct MppTaskStatusInfo_t { struct list_head list; RK_S32 count; MppTaskStatus status; + Condition *cond; } MppTaskStatusInfo; typedef struct MppTaskQueueImpl_t { Mutex *lock; RK_S32 task_count; + RK_S32 ready; // flag for deinit + Condition *finish_done; // condition for deinit done // two ports inside of task queue MppPort input; @@ -55,6 +66,8 @@ typedef struct MppPortImpl_t { static const char *module_name = MODULE_TAG; +RK_U32 mpp_task_debug = 0; + void setup_mpp_task_name(MppTaskImpl *task) { task->name = module_name; @@ -78,6 +91,8 @@ static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort return MPP_ERR_MALLOC; } + mpp_task_dbg_func("enter queue %p type %d\n", queue, type); + impl->type = type; impl->queue = queue; @@ -93,49 +108,107 @@ static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port = (MppPort *)impl; + mpp_task_dbg_func("leave queue %p port %p\n", queue, impl); + return MPP_OK; } static MPP_RET mpp_port_deinit(MppPort port) { + mpp_task_dbg_func("enter port %p\n", port); mpp_free(port); + mpp_task_dbg_func("leave\n"); return MPP_OK; } -MPP_RET mpp_port_can_dequeue(MppPort port) +MPP_RET mpp_port_poll(MppPort port, MppPollType timeout) { MppPortImpl *port_impl = (MppPortImpl *)port; MppTaskQueueImpl *queue = port_impl->queue; AutoMutex auto_lock(queue->lock); - MppTaskStatusInfo *curr = &queue->info[port_impl->status_curr]; + MppTaskStatusInfo *curr = NULL; + MPP_RET ret = MPP_NOK; - if (curr->count) { - mpp_assert(!list_empty(&curr->list)); - return MPP_OK; + mpp_task_dbg_func("enter port %p timeout %d\n", port, timeout); + if (!queue->ready) { + mpp_err("try to query when %s queue is not ready\n", + (port_impl->type == MPP_PORT_INPUT) ? + ("input") : ("output")); + goto RET; } - mpp_assert(list_empty(&curr->list)); - return MPP_NOK; + curr = &queue->info[port_impl->status_curr]; + if (curr->count) { + mpp_assert(!list_empty(&curr->list)); + ret = MPP_OK; + } else { + mpp_assert(list_empty(&curr->list)); + + /* timeout + * zero - non-block + * negtive - block + * positive - timeout value + */ + if (timeout != MPP_POLL_NON_BLOCK) { + mpp_assert(curr->cond); + Condition *cond = curr->cond; + RK_S32 wait_ret = 0; + if (timeout == MPP_POLL_BLOCK) { + mpp_task_dbg_func("port %p block wait start\n", port); + wait_ret = cond->wait(queue->lock); + mpp_task_dbg_func("port %p block wait done ret %d\n", port, wait_ret); + } else { + RK_S64 time = ((RK_S64)(timeout / 1000) << 32) + (timeout % 1000); + mpp_task_dbg_func("port %p timed wait start %d\n", port, timeout); + wait_ret = cond->timedwait(queue->lock, time); + mpp_task_dbg_func("port %p timed wait done ret %d\n", port, wait_ret); + } + + if (curr->count) { + mpp_assert(!list_empty(&curr->list)); + ret = MPP_OK; + } + } + } +RET: + mpp_task_dbg_func("leave port %p ret %d\n", port, ret); + return ret; } MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) { MppPortImpl *port_impl = (MppPortImpl *)port; MppTaskQueueImpl *queue = port_impl->queue; + MppTaskStatusInfo *curr = NULL; + MppTaskStatusInfo *next = NULL; + MppTaskImpl *task_impl = NULL; + MppTask p = NULL; AutoMutex auto_lock(queue->lock); - MppTaskStatusInfo *curr = &queue->info[port_impl->status_curr]; - MppTaskStatusInfo *next = &queue->info[port_impl->next_on_dequeue]; + MPP_RET ret = MPP_NOK; + + mpp_task_dbg_func("enter port %p\n", port); + + if (!queue->ready) { + mpp_err("try to dequeue when %s queue is not ready\n", + (port_impl->type == MPP_PORT_INPUT) ? + ("input") : ("output")); + goto RET; + } + + curr = &queue->info[port_impl->status_curr]; + next = &queue->info[port_impl->next_on_dequeue]; *task = NULL; if (curr->count == 0) { mpp_assert(list_empty(&curr->list)); - return MPP_OK; + goto RET; } - MppTaskImpl *task_impl = list_entry(curr->list.next, MppTaskImpl, list); - MppTask p = (MppTask)task_impl; + mpp_assert(!list_empty(&curr->list)); + task_impl = list_entry(curr->list.next, MppTaskImpl, list); + p = (MppTask)task_impl; check_mpp_task_name(p); list_del_init(&task_impl->list); curr->count--; @@ -146,8 +219,11 @@ MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) task_impl->status = next->status; *task = p; + ret = MPP_OK; +RET: + mpp_task_dbg_func("leave port %p ret %d\n", port, ret); - return MPP_OK; + return ret; } MPP_RET mpp_port_enqueue(MppPort port, MppTask task) @@ -155,14 +231,28 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task) MppTaskImpl *task_impl = (MppTaskImpl *)task; MppPortImpl *port_impl = (MppPortImpl *)port; MppTaskQueueImpl *queue = port_impl->queue; + MppTaskStatusInfo *curr = NULL; + MppTaskStatusInfo *next = NULL; + + AutoMutex auto_lock(queue->lock); + MPP_RET ret = MPP_NOK; + + mpp_task_dbg_func("enter port %p\n", port); + + if (!queue->ready) { + mpp_err("try to enqueue when %s queue is not ready\n", + (port_impl->type == MPP_PORT_INPUT) ? + ("input") : ("output")); + goto RET; + } + check_mpp_task_name(task); mpp_assert(task_impl->queue == (MppTaskQueue *)queue); mpp_assert(task_impl->status == port_impl->next_on_dequeue); - AutoMutex auto_lock(queue->lock); - MppTaskStatusInfo *curr = &queue->info[task_impl->status]; - MppTaskStatusInfo *next = &queue->info[port_impl->next_on_enqueue]; + curr = &queue->info[task_impl->status]; + next = &queue->info[port_impl->next_on_enqueue]; list_del_init(&task_impl->list); curr->count--; @@ -170,7 +260,13 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task) next->count++; task_impl->status = next->status; - return MPP_OK; + next->cond->signal(); + mpp_task_dbg_func("signal port %p\n", next); + ret = MPP_OK; +RET: + mpp_task_dbg_func("leave port %p ret %d\n", port, ret); + + return ret; } MPP_RET mpp_task_queue_init(MppTaskQueue *queue) @@ -180,54 +276,71 @@ MPP_RET mpp_task_queue_init(MppTaskQueue *queue) return MPP_ERR_NULL_PTR; } + MPP_RET ret = MPP_NOK; MppTaskQueueImpl *p = NULL; - MppTaskImpl *tasks = NULL; Mutex *lock = NULL; + Condition *cond[MPP_TASK_STATUS_BUTT] = { NULL }; + RK_S32 i; - do { - RK_S32 i; - - p = mpp_calloc(MppTaskQueueImpl, 1); - if (NULL == p) { - mpp_err_f("malloc queue failed\n"); - break; - } - lock = new Mutex(); - if (NULL == lock) { - mpp_err_f("new lock failed\n"); - break;; - } - - for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) { - INIT_LIST_HEAD(&p->info[i].list); - p->info[i].count = 0; - p->info[i].status = (MppTaskStatus)i; - } - - p->lock = lock; - p->tasks = tasks; - - if (mpp_port_init(p, MPP_PORT_INPUT, &p->input)) - break; - - if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) { - mpp_port_deinit(p->input); - break; - } - - *queue = p; - return MPP_OK; - } while (0); - - if (p) - mpp_free(p); - if (lock) - delete lock; - if (tasks) - mpp_free(tasks); + mpp_env_get_u32("mpp_task_debug", &mpp_task_debug, 0); + mpp_task_dbg_func("enter\n"); *queue = NULL; - return MPP_NOK; + + p = mpp_calloc(MppTaskQueueImpl, 1); + if (NULL == p) { + mpp_err_f("malloc queue failed\n"); + goto RET; + } + + cond[MPP_INPUT_PORT] = new Condition(); + cond[MPP_INPUT_HOLD] = NULL; + cond[MPP_OUTPUT_PORT] = new Condition(); + cond[MPP_OUTPUT_HOLD] = NULL; + + if (NULL == cond[MPP_INPUT_PORT] || + NULL == cond[MPP_OUTPUT_PORT]) { + mpp_err_f("new condition failed\n"); + goto RET; + } + + for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) { + INIT_LIST_HEAD(&p->info[i].list); + p->info[i].count = 0; + p->info[i].status = (MppTaskStatus)i; + p->info[i].cond = cond[i]; + } + + lock = new Mutex(); + if (NULL == lock) { + mpp_err_f("new lock failed\n"); + goto RET; + } + + p->lock = lock; + + if (mpp_port_init(p, MPP_PORT_INPUT, &p->input)) + goto RET; + + if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) { + mpp_port_deinit(p->input); + goto RET; + } + + ret = MPP_OK; +RET: + if (ret) { + if (lock) + delete lock; + MPP_FREE(cond[MPP_INPUT_PORT]); + MPP_FREE(cond[MPP_OUTPUT_PORT]); + MPP_FREE(p); + } + + *queue = p; + + mpp_task_dbg_func("leave ret %d queue %p\n", ret, p); + return ret; } MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count) @@ -260,6 +373,7 @@ MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count) list_add_tail(&tasks[i].list, &info->list); info->count++; } + impl->ready = 1; return MPP_OK; } @@ -271,14 +385,11 @@ MPP_RET mpp_task_queue_deinit(MppTaskQueue queue) } MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue; - if (p->input) { - mpp_port_deinit(p->input); - p->input = NULL; - } - if (p->output) { - mpp_port_deinit(p->output); - p->output = NULL; - } + p->lock->lock(); + + p->ready = 0; + p->info[MPP_INPUT_PORT].cond->signal(); + p->info[MPP_OUTPUT_PORT].cond->signal(); if (p->tasks) { for (RK_S32 i = 0; i < p->task_count; i++) { /* we must ensure that all task return to init status */ @@ -288,6 +399,16 @@ MPP_RET mpp_task_queue_deinit(MppTaskQueue queue) } mpp_free(p->tasks); } + + if (p->input) { + mpp_port_deinit(p->input); + p->input = NULL; + } + if (p->output) { + mpp_port_deinit(p->output); + p->output = NULL; + } + p->lock->unlock(); if (p->lock) delete p->lock; mpp_free(p); diff --git a/mpp/codec/mpp_dec.cpp b/mpp/codec/mpp_dec.cpp index b9b13ecc..cf0a3840 100644 --- a/mpp/codec/mpp_dec.cpp +++ b/mpp/codec/mpp_dec.cpp @@ -903,7 +903,11 @@ void *mpp_dec_advanced_thread(void *data) mpp_buf_slot_ready(frame_slots); } - mpp_assert(slot_size == buffer_size); + if (slot_size != buffer_size) { + mpp_err_f("slot size %d is not equal to buffer size %d\n", + slot_size, buffer_size); + mpp_assert(slot_size == buffer_size); + } } mpp_buf_slot_set_prop(frame_slots, task_dec->output, SLOT_BUFFER, output_buffer); @@ -943,6 +947,7 @@ void *mpp_dec_advanced_thread(void *data) mpp_task = NULL; // send finished task to output port + mpp_port_poll(output, MPP_POLL_BLOCK); mpp_port_dequeue(output, &mpp_task); mpp_task_meta_set_frame(mpp_task, KEY_OUTPUT_FRAME, frame); diff --git a/mpp/codec/mpp_enc.cpp b/mpp/codec/mpp_enc.cpp index 78e83544..4660cee0 100644 --- a/mpp/codec/mpp_enc.cpp +++ b/mpp/codec/mpp_enc.cpp @@ -171,6 +171,7 @@ void *mpp_enc_control_thread(void *data) mpp_task = NULL; // send finished task to output port + mpp_port_poll(output, MPP_POLL_BLOCK); mpp_port_dequeue(output, &mpp_task); mpp_task_meta_set_packet(mpp_task, KEY_OUTPUT_PACKET, packet); diff --git a/mpp/legacy/vpu_api_legacy.cpp b/mpp/legacy/vpu_api_legacy.cpp index 6941fde0..fe743f74 100755 --- a/mpp/legacy/vpu_api_legacy.cpp +++ b/mpp/legacy/vpu_api_legacy.cpp @@ -183,8 +183,6 @@ VpuApiLegacy::VpuApiLegacy() : init_ok(0), frame_count(0), set_eos(0), - block_input(0), - block_output(0), fp(NULL), fp_buf(NULL), memGroup(NULL), @@ -242,10 +240,15 @@ RK_S32 VpuApiLegacy::init(VpuCodecContext *ctx, RK_U8 *extraData, RK_U32 extra_s } if (CODEC_DECODER == ctx->codecType) { - block_input = 0; - block_output = 0; type = MPP_CTX_DEC; } else if (CODEC_ENCODER == ctx->codecType) { + MppPollType block = MPP_POLL_BLOCK; + + /* setup input / output block mode */ + ret = mpi->control(mpp_ctx, MPP_SET_INPUT_BLOCK, (MppParam)&block); + if (MPP_OK != ret) + mpp_err("mpi->control MPP_SET_INPUT_BLOCK failed\n"); + if (memGroup == NULL) { ret = mpp_buffer_group_get_internal(&memGroup, MPP_BUFFER_TYPE_ION); if (MPP_OK != ret) { @@ -253,24 +256,12 @@ RK_S32 VpuApiLegacy::init(VpuCodecContext *ctx, RK_U8 *extraData, RK_U32 extra_s return ret; } } - block_input = 1; - block_output = 0; + type = MPP_CTX_ENC; } else { return MPP_ERR_VPU_CODEC_INIT; } - /* setup input / output block mode */ - ret = mpi->control(mpp_ctx, MPP_SET_INPUT_BLOCK, (MppParam)&block_input); - if (MPP_OK != ret) { - mpp_err("mpi->control MPP_SET_INPUT_BLOCK failed\n"); - } - - ret = mpi->control(mpp_ctx, MPP_SET_OUTPUT_BLOCK, (MppParam)&block_output); - if (MPP_OK != ret) { - mpp_err("mpi->control MPP_SET_OUTPUT_BLOCK failed\n"); - } - ret = mpp_init(mpp_ctx, type, (MppCodingType)ctx->videoCoding); if (ret) { mpp_err_f(" init error. \n"); @@ -531,60 +522,56 @@ RK_S32 VpuApiLegacy::decode(VpuCodecContext *ctx, VideoPacket_t *pkt, DecoderOut vpu_api_dbg_func("mpp import input fd %d output fd %d", mpp_buffer_get_fd(str_buf), mpp_buffer_get_fd(pic_buf)); - do { - ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task); - if (ret) { - mpp_err("mpp task input dequeue failed\n"); - goto DECODE_OUT; - } - if (task == NULL) { - vpu_api_dbg_func("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!"); - msleep(3); - } else - break; - } while (1); + ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp input poll failed\n"); + goto DECODE_OUT; + } + + ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task); + if (ret) { + mpp_err("mpp task input dequeue failed\n"); + goto DECODE_OUT; + } mpp_task_meta_set_packet(task, KEY_INPUT_PACKET, packet); mpp_task_meta_set_frame (task, KEY_OUTPUT_FRAME, mframe); - if (mpi != NULL) { - ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task); + ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task); + if (ret) { + mpp_err("mpp task input enqueue failed\n"); + goto DECODE_OUT; + } + + pkt->size = 0; + task = NULL; + + ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp output poll failed\n"); + goto DECODE_OUT; + } + + ret = mpi->dequeue(mpp_ctx, MPP_PORT_OUTPUT, &task); + if (ret) { + mpp_err("ret %d mpp task output dequeue failed\n", ret); + goto DECODE_OUT; + } + + if (task) { + MppFrame frame_out = NULL; + + mpp_task_meta_get_frame(task, KEY_OUTPUT_FRAME, &frame_out); + mpp_assert(frame_out == mframe); + vpu_api_dbg_func("decoded frame %d\n", frame_count); + frame_count++; + + ret = mpi->enqueue(mpp_ctx, MPP_PORT_OUTPUT, task); if (ret) { - mpp_err("mpp task input enqueue failed\n"); + mpp_err("mpp task output enqueue failed\n"); goto DECODE_OUT; } - - pkt->size = 0; task = NULL; - - do { - ret = mpi->dequeue(mpp_ctx, MPP_PORT_OUTPUT, &task); - if (ret) { - mpp_err("ret %d mpp task output dequeue failed\n", ret); - goto DECODE_OUT; - } - - if (task) { - MppFrame frame_out = NULL; - - mpp_task_meta_get_frame(task, KEY_OUTPUT_FRAME, &frame_out); - mpp_assert(frame_out == mframe); - vpu_api_dbg_func("decoded frame %d\n", frame_count); - frame_count++; - - ret = mpi->enqueue(mpp_ctx, MPP_PORT_OUTPUT, task); - if (ret) { - mpp_err("mpp task output enqueue failed\n"); - goto DECODE_OUT; - } - task = NULL; - - break; - } - msleep(3); - } while (1); - } else { - mpp_err("mpi pointer is NULL, failed!"); } // copy decoded frame into output buffer, and set outpub frame size @@ -700,16 +687,13 @@ RK_S32 VpuApiLegacy::decode_sendstream(VideoPacket_t *pkt) vpu_api_dbg_input("input size %-6d flag %x pts %lld\n", pkt->size, pkt->nFlags, pkt->pts); - do { - ret = mpi->decode_put_packet(mpp_ctx, mpkt); - if (ret == MPP_OK) { - pkt->size = 0; - break; - } else { - /* reduce cpu overhead here */ - msleep(1); - } - } while (block_input); + ret = mpi->decode_put_packet(mpp_ctx, mpkt); + if (ret == MPP_OK) { + pkt->size = 0; + } else { + /* reduce cpu overhead here */ + msleep(1); + } mpp_packet_deinit(&mpkt); @@ -896,80 +880,88 @@ RK_S32 VpuApiLegacy::encode(VpuCodecContext *ctx, EncInputStream_t *aEncInStrm, vpu_api_dbg_func("mpp import input fd %d output fd %d", mpp_buffer_get_fd(pic_buf), mpp_buffer_get_fd(str_buf)); - do { - ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task); - if (ret) { - mpp_err("mpp task input dequeue failed\n"); - goto ENCODE_OUT; - } - if (task == NULL) { - vpu_api_dbg_func("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!"); - msleep(3); - } else - break; - } while (1); + ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp input poll failed\n"); + goto ENCODE_OUT; + } + + ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task); + if (ret) { + mpp_err("mpp task input dequeue failed\n"); + goto ENCODE_OUT; + } + if (task == NULL) { + mpp_err("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!"); + goto ENCODE_OUT; + } mpp_task_meta_set_frame (task, KEY_INPUT_FRAME, frame); mpp_task_meta_set_packet(task, KEY_OUTPUT_PACKET, packet); - if (mpi != NULL) { - ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task); + ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task); + if (ret) { + mpp_err("mpp task input enqueue failed\n"); + goto ENCODE_OUT; + } + task = NULL; + + ret = mpi->poll(mpp_ctx, MPP_PORT_OUTPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp output poll failed\n"); + goto ENCODE_OUT; + } + + ret = mpi->dequeue(mpp_ctx, MPP_PORT_OUTPUT, &task); + if (ret) { + mpp_err("ret %d mpp task output dequeue failed\n", ret); + goto ENCODE_OUT; + } + + mpp_assert(task); + + if (task) { + MppFrame frame_out = NULL; + MppFrame packet_out = NULL; + + mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, &packet_out); + + mpp_assert(packet_out == packet); + vpu_api_dbg_func("encoded frame %d\n", frame_count); + frame_count++; + + ret = mpi->enqueue(mpp_ctx, MPP_PORT_OUTPUT, task); if (ret) { - mpp_err("mpp task input enqueue failed\n"); + mpp_err("mpp task output enqueue failed\n"); goto ENCODE_OUT; } task = NULL; - do { - ret = mpi->dequeue(mpp_ctx, MPP_PORT_OUTPUT, &task); - if (ret) { - mpp_err("ret %d mpp task output dequeue failed\n", ret); - goto ENCODE_OUT; - } + ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp input poll failed\n"); + goto ENCODE_OUT; + } - if (task) { - MppFrame frame_out = NULL; - MppFrame packet_out = NULL; - - mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, &packet_out); - - mpp_assert(packet_out == packet); - vpu_api_dbg_func("encoded frame %d\n", frame_count); - frame_count++; - - ret = mpi->enqueue(mpp_ctx, MPP_PORT_OUTPUT, task); - if (ret) { - mpp_err("mpp task output enqueue failed\n"); - goto ENCODE_OUT; - } - task = NULL; - - // dequeue task from MPP_PORT_INPUT - ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task); - if (ret) { - mpp_log_f("failed to dequeue from input port ret %d\n", ret); - break; - } - mpp_assert(task); - ret = mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame_out); - mpp_assert(frame_out == frame); - ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task); - if (ret) { - mpp_err("mpp task output enqueue failed\n"); - goto ENCODE_OUT; - } - task = NULL; - - break; - } - msleep(3); - } while (1); - } else { - mpp_err("mpi pointer is NULL, failed!"); + // dequeue task from MPP_PORT_INPUT + ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task); + if (ret) { + mpp_log_f("failed to dequeue from input port ret %d\n", ret); + goto ENCODE_OUT; + } + mpp_assert(task); + ret = mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame_out); + mpp_assert(frame_out == frame); + ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task); + if (ret) { + mpp_err("mpp task output enqueue failed\n"); + goto ENCODE_OUT; + } + task = NULL; } // copy encoded stream into output buffer, and set output stream size - if (packet != NULL) { + if (packet) { RK_U32 eos = mpp_packet_get_eos(packet); RK_S64 pts = mpp_packet_get_pts(packet); RK_U32 flag = mpp_packet_get_flag(packet); diff --git a/mpp/legacy/vpu_api_legacy.h b/mpp/legacy/vpu_api_legacy.h index 4cdb6307..50cbc2ae 100644 --- a/mpp/legacy/vpu_api_legacy.h +++ b/mpp/legacy/vpu_api_legacy.h @@ -73,9 +73,6 @@ private: RK_U32 frame_count; RK_U32 set_eos; - RK_U32 block_input; - RK_U32 block_output; - FILE *fp; RK_U8 *fp_buf; diff --git a/mpp/mpi.cpp b/mpp/mpi.cpp index f5f0c9c6..0e1501c0 100644 --- a/mpp/mpi.cpp +++ b/mpp/mpi.cpp @@ -285,6 +285,32 @@ static MPP_RET mpi_isp_get_frame(MppCtx ctx, MppFrame *frame) return ret; } +static MPP_RET mpi_poll(MppCtx ctx, MppPortType type, MppPollType timeout) +{ + MPP_RET ret = MPP_NOK; + MpiImpl *p = (MpiImpl *)ctx; + + mpi_dbg_func("enter ctx %p type %d timeout %d\n", ctx, type, timeout); + do { + ret = check_mpp_ctx(p); + if (ret) + break;; + + if (type >= MPP_PORT_BUTT || + timeout < MPP_POLL_BUTT || + timeout > MPP_POLL_MAX) { + mpp_err_f("invalid input type %d timeout %d\n", type, timeout); + ret = MPP_ERR_UNKNOW; + break; + } + + ret = p->ctx->poll(type, timeout); + } while (0); + + mpi_dbg_func("leave ret %d\n", ret); + return ret; +} + static MPP_RET mpi_dequeue(MppCtx ctx, MppPortType type, MppTask *task) { MPP_RET ret = MPP_NOK; @@ -381,6 +407,7 @@ static MppApi mpp_api = { mpi_isp, mpi_isp_put_frame, mpi_isp_get_frame, + mpi_poll, mpi_dequeue, mpi_enqueue, mpi_reset, diff --git a/mpp/mpp.cpp b/mpp/mpp.cpp index 59d75a0d..2142a23f 100644 --- a/mpp/mpp.cpp +++ b/mpp/mpp.cpp @@ -50,8 +50,8 @@ Mpp::Mpp() mOutputPort(NULL), mInputTaskQueue(NULL), mOutputTaskQueue(NULL), - mInputBlock(0), - mOutputBlock(0), + mInputBlock(MPP_POLL_NON_BLOCK), + mOutputBlock(MPP_POLL_NON_BLOCK), mThreadCodec(NULL), mThreadHal(NULL), mDec(NULL), @@ -60,7 +60,6 @@ Mpp::Mpp() mCoding(MPP_VIDEO_CodingUnused), mInitDone(0), mMultiFrame(0), - mInputTask(NULL), mStatus(0), mParserFastMode(0), mParserNeedSplit(0), @@ -270,8 +269,9 @@ MPP_RET Mpp::get_frame(MppFrame *frame) if (0 == mFrames->list_size()) { mThreadCodec->signal(); - if (mOutputBlock) + if (mOutputBlock == MPP_POLL_BLOCK) mFrames->wait(); + /* NOTE: this sleep is to avoid user's dead loop */ msleep(1); } @@ -302,61 +302,58 @@ MPP_RET Mpp::put_frame(MppFrame frame) return MPP_NOK; MPP_RET ret = MPP_NOK; - MppTask task = mInputTask; + MppTask task = NULL; - do { - if (NULL == task) { - ret = dequeue(MPP_PORT_INPUT, &task); - if (ret) { - mpp_log_f("failed to dequeue from input port ret %d\n", ret); - break; - } - } + ret = poll(MPP_PORT_INPUT, mInputBlock); + if (ret) { + mpp_log_f("poll on set timeout %d ret %d\n", mInputBlock, ret); + goto RET; + } - /* FIXME: use wait to do block wait */ - if (mInputBlock && NULL == task) { - msleep(2); - continue; - } + ret = dequeue(MPP_PORT_INPUT, &task); + if (ret || NULL == task) { + mpp_log_f("dequeue on set ret %d task %p\n", ret, task); + goto RET; + } + mpp_assert(task); + + ret = mpp_task_meta_set_frame(task, KEY_INPUT_FRAME, frame); + if (ret) { + mpp_log_f("set input frame to task ret %d\n", ret); + goto RET; + } + + ret = enqueue(MPP_PORT_INPUT, task); + if (ret) { + mpp_log_f("enqueue ret %d\n", ret); + goto RET; + } + + ret = poll(MPP_PORT_INPUT, mInputBlock); + if (ret) { + mpp_log_f("poll on get timeout %d ret %d\n", mInputBlock, ret); + goto RET; + } + + ret = dequeue(MPP_PORT_INPUT, &task); + if (ret) { + mpp_log_f("dequeue on get ret %d\n", ret); + goto RET; + } + + if (mInputBlock != MPP_POLL_NON_BLOCK) mpp_assert(task); - ret = mpp_task_meta_set_frame(task, KEY_INPUT_FRAME, frame); - if (ret) { - mpp_log_f("failed to set input frame to task ret %d\n", ret); - break; + if (task) { + ret = mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame); + if (frame) { + mpp_frame_deinit(&frame); + frame = NULL; } + } - ret = enqueue(MPP_PORT_INPUT, task); - if (ret) { - mpp_log_f("failed to enqueue task to input port ret %d\n", ret); - break; - } - - if (mInputBlock) { - while (MPP_NOK == mpp_port_can_dequeue(mInputPort)) { - msleep(2); - } - - ret = dequeue(MPP_PORT_INPUT, &task); - if (ret) { - mpp_log_f("failed to dequeue from input port ret %d\n", ret); - break; - } - - mpp_assert(task); - ret = mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame); - if (frame) { - mpp_frame_deinit(&frame); - frame = NULL; - } - } - - break; - } while (1); - - mInputTask = task; - +RET: return ret; } @@ -368,45 +365,61 @@ MPP_RET Mpp::get_packet(MppPacket *packet) MPP_RET ret = MPP_OK; MppTask task = NULL; - do { - if (NULL == task) { - ret = dequeue(MPP_PORT_OUTPUT, &task); - if (ret) { - mpp_log_f("failed to dequeue from output port ret %d\n", ret); - break; - } - } + ret = poll(MPP_PORT_OUTPUT, mOutputBlock); + if (ret) { + mpp_log_f("poll on get timeout %d ret %d\n", mOutputBlock, ret); + goto RET; + } - /* FIXME: use wait to do block wait */ - if (NULL == task) { - if (mOutputBlock) { - msleep(2); - continue; - } else { - break; - } - } + ret = dequeue(MPP_PORT_OUTPUT, &task); + if (ret || NULL == task) { + mpp_log_f("dequeue on get ret %d task %p\n", ret, task); + goto RET; + } - mpp_assert(task); + mpp_assert(task); - ret = mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, packet); - if (ret) { - mpp_log_f("failed to get output packet from task ret %d\n", ret); - break; - } + ret = mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, packet); + if (ret) { + mpp_log_f("get output packet from task ret %d\n", ret); + goto RET; + } - mpp_assert(*packet); + mpp_assert(*packet); - if (mpp_debug & MPP_DBG_PTS) - mpp_log_f("pts %lld\n", mpp_packet_get_pts(*packet)); + if (mpp_debug & MPP_DBG_PTS) + mpp_log_f("pts %lld\n", mpp_packet_get_pts(*packet)); - ret = enqueue(MPP_PORT_OUTPUT, task); - if (ret) { - mpp_log_f("failed to enqueue task to output port ret %d\n", ret); - } + ret = enqueue(MPP_PORT_OUTPUT, task); + if (ret) + mpp_log_f("enqueue on set ret %d\n", ret); +RET: - break; - } while (1); + return ret; +} + +MPP_RET Mpp::poll(MppPortType type, MppPollType timeout) +{ + if (!mInitDone) + return MPP_NOK; + + MPP_RET ret = MPP_NOK; + AutoMutex autoLock(mPortLock); + MppTaskQueue port = NULL; + + switch (type) { + case MPP_PORT_INPUT : { + port = mInputPort; + } break; + case MPP_PORT_OUTPUT : { + port = mOutputPort; + } break; + default : { + } break; + } + + if (port) + ret = mpp_port_poll(port, timeout); return ret; } @@ -419,6 +432,7 @@ MPP_RET Mpp::dequeue(MppPortType type, MppTask *task) MPP_RET ret = MPP_NOK; AutoMutex autoLock(mPortLock); MppTaskQueue port = NULL; + switch (type) { case MPP_PORT_INPUT : { port = mInputPort; @@ -444,6 +458,7 @@ MPP_RET Mpp::enqueue(MppPortType type, MppTask task) MPP_RET ret = MPP_NOK; AutoMutex autoLock(mPortLock); MppTaskQueue port = NULL; + switch (type) { case MPP_PORT_INPUT : { port = mInputPort; @@ -581,11 +596,11 @@ MPP_RET Mpp::control_mpp(MpiCmd cmd, MppParam param) switch (cmd) { case MPP_SET_INPUT_BLOCK: { - RK_U32 block = *((RK_U32 *)param); + MppPollType block = *((MppPollType *)param); mInputBlock = block; } break; case MPP_SET_OUTPUT_BLOCK: { - RK_U32 block = *((RK_U32 *)param); + MppPollType block = *((MppPollType *)param); mOutputBlock = block; } break; default : { diff --git a/mpp/mpp.h b/mpp/mpp.h index a582e8dc..05c85120 100644 --- a/mpp/mpp.h +++ b/mpp/mpp.h @@ -71,6 +71,7 @@ public: MPP_RET put_frame(MppFrame frame); MPP_RET get_packet(MppPacket *packet); + MPP_RET poll(MppPortType type, MppPollType timeout); MPP_RET dequeue(MppPortType type, MppTask *task); MPP_RET enqueue(MppPortType type, MppTask task); @@ -109,8 +110,8 @@ public: MppTaskQueue mInputTaskQueue; MppTaskQueue mOutputTaskQueue; - RK_U32 mInputBlock; - RK_U32 mOutputBlock; + MppPollType mInputBlock; + MppPollType mOutputBlock; /* * There are two threads for each decoder/encoder: codec thread and hal thread * @@ -135,9 +136,6 @@ private: RK_U32 mInitDone; RK_U32 mMultiFrame; - // task for put_frame / put_packet - MppTask mInputTask; - RK_U32 mStatus; /* decoder paramter before init */ diff --git a/test/mpi_dec_test.c b/test/mpi_dec_test.c index 4bb2ca86..90202049 100644 --- a/test/mpi_dec_test.c +++ b/test/mpi_dec_test.c @@ -266,20 +266,19 @@ static int decode_advanced(MpiDecLoopData *data) if (pkt_eos) mpp_packet_set_eos(packet); - do { - ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task); /* input queue */ - if (ret) { - mpp_err("mpp task input dequeue failed\n"); - return ret; - } + ret = mpi->poll(ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp input poll failed\n"); + return ret; + } - if (task == NULL) { - mpp_log("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!"); - msleep(3); - } else { - break; - } - } while (1); + ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task); /* input queue */ + if (ret) { + mpp_err("mpp task input dequeue failed\n"); + return ret; + } + + mpp_assert(task); mpp_task_meta_set_packet(task, KEY_INPUT_PACKET, packet); mpp_task_meta_set_frame (task, KEY_OUTPUT_FRAME, frame); @@ -290,46 +289,49 @@ static int decode_advanced(MpiDecLoopData *data) return ret; } - msleep(20); + /* poll and wait here */ + ret = mpi->poll(ctx, MPP_PORT_OUTPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp output poll failed\n"); + return ret; + } - do { - ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); /* output queue */ - if (ret) { - mpp_err("mpp task output dequeue failed\n"); - return ret; - } + ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); /* output queue */ + if (ret) { + mpp_err("mpp task output dequeue failed\n"); + return ret; + } - if (task) { - MppFrame frame_out = NULL; - mpp_task_meta_get_frame(task, KEY_OUTPUT_FRAME, &frame_out); - //mpp_assert(packet_out == packet); + mpp_assert(task); - if (frame) { - /* write frame to file here */ - MppBuffer buf_out = mpp_frame_get_buffer(frame_out); + if (task) { + MppFrame frame_out = NULL; + mpp_task_meta_get_frame(task, KEY_OUTPUT_FRAME, &frame_out); + //mpp_assert(packet_out == packet); - if (buf_out) { - void *ptr = mpp_buffer_get_ptr(buf_out); - size_t len = mpp_buffer_get_size(buf_out); + if (frame) { + /* write frame to file here */ + MppBuffer buf_out = mpp_frame_get_buffer(frame_out); - if (data->fp_output) - fwrite(ptr, 1, len, data->fp_output); + if (buf_out) { + void *ptr = mpp_buffer_get_ptr(buf_out); + size_t len = mpp_buffer_get_size(buf_out); - mpp_log("decoded frame %d size %d\n", data->frame_count, len); - } + if (data->fp_output) + fwrite(ptr, 1, len, data->fp_output); - if (mpp_frame_get_eos(frame_out)) - mpp_log("found eos frame\n"); + mpp_log("decoded frame %d size %d\n", data->frame_count, len); } - ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); /* output queue */ - if (ret) { - mpp_err("mpp task output enqueue failed\n"); - return ret; - } - break; + if (mpp_frame_get_eos(frame_out)) + mpp_log("found eos frame\n"); } - } while (1); + + /* output queue */ + ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); + if (ret) + mpp_err("mpp task output enqueue failed\n"); + } return ret; } @@ -419,7 +421,7 @@ int mpi_dec_test_decode(MpiDecTestCmd *cmd) goto MPP_TEST_OUT; } - ret = mpp_buffer_get(data.frm_grp, &frm_buf, width * height * 3 / 2); + ret = mpp_buffer_get(data.frm_grp, &frm_buf, width * height * 2); if (ret) { mpp_err("failed to get buffer for input frame ret %d\n", ret); goto MPP_TEST_OUT; diff --git a/test/mpi_enc_test.c b/test/mpi_enc_test.c index bb5ab435..05e88d66 100644 --- a/test/mpi_enc_test.c +++ b/test/mpi_enc_test.c @@ -461,26 +461,25 @@ int mpi_enc_test(MpiEncTestCmd *cmd) mpp_packet_init_with_buffer(&packet, pkt_buf_out); - do { - ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task); - if (ret) { - mpp_err("mpp task input dequeue failed\n"); - goto MPP_TEST_OUT; - } - if (task == NULL) { - mpp_log("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!"); - msleep(3); - } else { - MppFrame frame_out = NULL; + ret = mpi->poll(ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp task input poll failed ret %d\n", ret); + goto MPP_TEST_OUT; + } - mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame_out); - if (frame_out) - mpp_assert(frame_out == frame); + ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task); + if (ret || NULL == task) { + mpp_err("mpp task input dequeue failed ret %d task %p\n", ret, task); + goto MPP_TEST_OUT; + } - break; - } - } while (1); + if (task) { + MppFrame frame_out = NULL; + mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame_out); + if (frame_out) + mpp_assert(frame_out == frame); + } mpp_task_meta_set_frame (task, KEY_INPUT_FRAME, frame); mpp_task_meta_set_packet(task, KEY_OUTPUT_PACKET, packet); @@ -513,50 +512,51 @@ int mpi_enc_test(MpiEncTestCmd *cmd) goto MPP_TEST_OUT; } - msleep(20); + ret = mpi->poll(ctx, MPP_PORT_OUTPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp task output poll failed ret %d\n", ret); + goto MPP_TEST_OUT; + } - do { - ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); + ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); + if (ret || NULL == task) { + mpp_err("mpp task output dequeue failed ret %d task %p\n", ret, task); + goto MPP_TEST_OUT; + } + + if (task) { + MppFrame packet_out = NULL; + + mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, &packet_out); + + mpp_assert(packet_out == packet); + if (packet) { + // write packet to file here + void *ptr = mpp_packet_get_pos(packet); + size_t len = mpp_packet_get_length(packet); + + pkt_eos = mpp_packet_get_eos(packet); + + if (fp_output) + fwrite(ptr, 1, len, fp_output); + mpp_packet_deinit(&packet); + + mpp_log_f("encoded frame %d size %d\n", frame_count, len); + stream_size += len; + + if (pkt_eos) { + mpp_log("found last packet\n"); + mpp_assert(frm_eos); + } + } + frame_count++; + + ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); if (ret) { - mpp_err("mpp task output dequeue failed\n"); + mpp_err("mpp task output enqueue failed\n"); goto MPP_TEST_OUT; } - - if (task) { - MppFrame packet_out = NULL; - - mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, &packet_out); - - mpp_assert(packet_out == packet); - if (packet) { - // write packet to file here - void *ptr = mpp_packet_get_pos(packet); - size_t len = mpp_packet_get_length(packet); - - pkt_eos = mpp_packet_get_eos(packet); - - if (fp_output) - fwrite(ptr, 1, len, fp_output); - mpp_packet_deinit(&packet); - - mpp_log_f("encoded frame %d size %d\n", frame_count, len); - stream_size += len; - - if (pkt_eos) { - mpp_log("found last packet\n"); - mpp_assert(frm_eos); - } - } - frame_count++; - - ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); - if (ret) { - mpp_err("mpp task output enqueue failed\n"); - goto MPP_TEST_OUT; - } - break; - } - } while (1); + } if (num_frames && frame_count >= num_frames) { mpp_log_f("encode max %d frames", frame_count); diff --git a/test/mpi_rc_test.c b/test/mpi_rc_test.c index 42f49f98..85614694 100644 --- a/test/mpi_rc_test.c +++ b/test/mpi_rc_test.c @@ -821,25 +821,25 @@ static MPP_RET mpi_rc_codec(MpiRcTestCtx *ctx) mpp_packet_init_with_buffer(&packet, pkt_buf_out); - do { - ret = enc_mpi->dequeue(enc_ctx, MPP_PORT_INPUT, &enc_task); - if (ret) { - mpp_err("mpp task input dequeue failed\n"); - goto MPP_TEST_OUT; - } - if (enc_task == NULL) { - mpp_log("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!"); - msleep(3); - } else { - MppFrame tmp_frm = NULL; + ret = enc_mpi->poll(enc_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK); + if (ret) { + mpp_err("mpp input poll failed\n"); + goto MPP_TEST_OUT; + } - mpp_task_meta_get_frame(enc_task, KEY_INPUT_FRAME, &tmp_frm); - if (tmp_frm) - mpp_assert(tmp_frm == frame_in); + ret = enc_mpi->dequeue(enc_ctx, MPP_PORT_INPUT, &enc_task); + if (ret) { + mpp_err("mpp task input dequeue failed\n"); + goto MPP_TEST_OUT; + } - break; - } - } while (1); + mpp_assert(enc_task); + { + MppFrame tmp_frm = NULL; + mpp_task_meta_get_frame(enc_task, KEY_INPUT_FRAME, &tmp_frm); + if (tmp_frm) + mpp_assert(tmp_frm == frame_in); + } mpp_task_meta_set_frame (enc_task, KEY_INPUT_FRAME, frame_in); mpp_task_meta_set_packet(enc_task, KEY_OUTPUT_PACKET, packet);