[mpi]: Add poll interface to mpi

1. Add poll with timeout. Support three mode: block/non-block/timeout.
2. Change control MPP_SET_INPUT_BLOCK and MPP_SET_OUTPUT_BLOCK.
3. Remove msleep in most mpp interface.

Change-Id: I39d9a261b6f1da66c6cb944abd71d1e7f4928d2d
Signed-off-by: Herman Chen <herman.chen@rock-chips.com>
This commit is contained in:
Herman Chen
2016-12-20 15:10:17 +08:00
parent 1bbd1dcce6
commit d8ed1f4eb1
13 changed files with 599 additions and 419 deletions

View File

@@ -90,6 +90,21 @@ typedef enum {
MPP_TASK_WORK_MODE_BUTT,
} MppTaskWorkMode;
/*
* Mpp port pull type
*
* MPP_POLL_BLOCK - for block poll
* MPP_POLL_NON_BLOCK - for non-block poll
* small than MPP_POLL_MAX - for poll with timeout in ms
* small than MPP_POLL_BUTT or larger than MPP_POLL_MAX is invalid value
*/
typedef enum {
MPP_POLL_BUTT = -2,
MPP_POLL_BLOCK = -1,
MPP_POLL_NON_BLOCK = 0,
MPP_POLL_MAX = 1000,
} MppPollType;
/*
* MppTask is descriptor of a task which send to mpp for process
* mpp can support different type of work mode, for example:
@@ -219,7 +234,7 @@ MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count);
MPP_RET mpp_task_queue_deinit(MppTaskQueue queue);
MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type);
MPP_RET mpp_port_can_dequeue(MppPort port);
MPP_RET mpp_port_poll(MppPort port, MppPollType timeout);
MPP_RET mpp_port_dequeue(MppPort port, MppTask *task);
MPP_RET mpp_port_enqueue(MppPort port, MppTask task);

View File

@@ -229,12 +229,19 @@ typedef struct MppApi_t {
MPP_RET (*isp_get_frame)(MppCtx ctx, MppFrame *frame);
// advance data flow interface
/**
* @brief poll port for dequeue
* @param ctx The context of mpp
* @param type input port or output port which are both for data transaction
* @return 0 for success there is valid task for dequeue, others for failure
*/
MPP_RET (*poll)(MppCtx ctx, MppPortType type, MppPollType timeout);
/**
* @brief dequeue MppTask
* @param ctx The context of mpp
* @param type input port or output port which are both for data transaction
* @param task MppTask which is sent to mpp for process
* @return 0 for success, oters for failure
* @return 0 for success, others for failure
*/
MPP_RET (*dequeue)(MppCtx ctx, MppPortType type, MppTask *task);
/**
@@ -242,7 +249,7 @@ typedef struct MppApi_t {
* @param ctx The context of mpp
* @param type input port or output port which are both for data transaction
* @param task MppTask which is sent to mpp for process
* @return 0 for success, oters for failure
* @return 0 for success, others for failure
*/
MPP_RET (*enqueue)(MppCtx ctx, MppPortType type, MppTask task);
@@ -258,7 +265,7 @@ typedef struct MppApi_t {
* @param ctx The context of mpp
* @param cmd The mpi command
* @param param The mpi command parameter
* @return 0 for success, oters for failure
* @return 0 for success, others for failure
*/
MPP_RET (*control)(MppCtx ctx, MpiCmd cmd, MppParam param);

View File

@@ -18,6 +18,7 @@
#include <string.h>
#include "mpp_env.h"
#include "mpp_log.h"
#include "mpp_mem.h"
@@ -25,15 +26,25 @@
#define MAX_TASK_COUNT 8
#define MPP_TASK_DBG_FUNCTION (0x00000001)
#define mpp_task_dbg(flag, fmt, ...) _mpp_dbg(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
#define mpp_task_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_task_debug, flag, fmt, ## __VA_ARGS__)
#define mpp_task_dbg_func(fmt, ...) mpp_task_dbg_f(MPP_TASK_DBG_FUNCTION, fmt, ## __VA_ARGS__)
typedef struct MppTaskStatusInfo_t {
struct list_head list;
RK_S32 count;
MppTaskStatus status;
Condition *cond;
} MppTaskStatusInfo;
typedef struct MppTaskQueueImpl_t {
Mutex *lock;
RK_S32 task_count;
RK_S32 ready; // flag for deinit
Condition *finish_done; // condition for deinit done
// two ports inside of task queue
MppPort input;
@@ -55,6 +66,8 @@ typedef struct MppPortImpl_t {
static const char *module_name = MODULE_TAG;
RK_U32 mpp_task_debug = 0;
void setup_mpp_task_name(MppTaskImpl *task)
{
task->name = module_name;
@@ -78,6 +91,8 @@ static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort
return MPP_ERR_MALLOC;
}
mpp_task_dbg_func("enter queue %p type %d\n", queue, type);
impl->type = type;
impl->queue = queue;
@@ -93,49 +108,107 @@ static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort
*port = (MppPort *)impl;
mpp_task_dbg_func("leave queue %p port %p\n", queue, impl);
return MPP_OK;
}
static MPP_RET mpp_port_deinit(MppPort port)
{
mpp_task_dbg_func("enter port %p\n", port);
mpp_free(port);
mpp_task_dbg_func("leave\n");
return MPP_OK;
}
MPP_RET mpp_port_can_dequeue(MppPort port)
MPP_RET mpp_port_poll(MppPort port, MppPollType timeout)
{
MppPortImpl *port_impl = (MppPortImpl *)port;
MppTaskQueueImpl *queue = port_impl->queue;
AutoMutex auto_lock(queue->lock);
MppTaskStatusInfo *curr = &queue->info[port_impl->status_curr];
MppTaskStatusInfo *curr = NULL;
MPP_RET ret = MPP_NOK;
mpp_task_dbg_func("enter port %p timeout %d\n", port, timeout);
if (!queue->ready) {
mpp_err("try to query when %s queue is not ready\n",
(port_impl->type == MPP_PORT_INPUT) ?
("input") : ("output"));
goto RET;
}
curr = &queue->info[port_impl->status_curr];
if (curr->count) {
mpp_assert(!list_empty(&curr->list));
ret = MPP_OK;
} else {
mpp_assert(list_empty(&curr->list));
/* timeout
* zero - non-block
* negtive - block
* positive - timeout value
*/
if (timeout != MPP_POLL_NON_BLOCK) {
mpp_assert(curr->cond);
Condition *cond = curr->cond;
RK_S32 wait_ret = 0;
if (timeout == MPP_POLL_BLOCK) {
mpp_task_dbg_func("port %p block wait start\n", port);
wait_ret = cond->wait(queue->lock);
mpp_task_dbg_func("port %p block wait done ret %d\n", port, wait_ret);
} else {
RK_S64 time = ((RK_S64)(timeout / 1000) << 32) + (timeout % 1000);
mpp_task_dbg_func("port %p timed wait start %d\n", port, timeout);
wait_ret = cond->timedwait(queue->lock, time);
mpp_task_dbg_func("port %p timed wait done ret %d\n", port, wait_ret);
}
if (curr->count) {
mpp_assert(!list_empty(&curr->list));
return MPP_OK;
ret = MPP_OK;
}
mpp_assert(list_empty(&curr->list));
return MPP_NOK;
}
}
RET:
mpp_task_dbg_func("leave port %p ret %d\n", port, ret);
return ret;
}
MPP_RET mpp_port_dequeue(MppPort port, MppTask *task)
{
MppPortImpl *port_impl = (MppPortImpl *)port;
MppTaskQueueImpl *queue = port_impl->queue;
MppTaskStatusInfo *curr = NULL;
MppTaskStatusInfo *next = NULL;
MppTaskImpl *task_impl = NULL;
MppTask p = NULL;
AutoMutex auto_lock(queue->lock);
MppTaskStatusInfo *curr = &queue->info[port_impl->status_curr];
MppTaskStatusInfo *next = &queue->info[port_impl->next_on_dequeue];
MPP_RET ret = MPP_NOK;
mpp_task_dbg_func("enter port %p\n", port);
if (!queue->ready) {
mpp_err("try to dequeue when %s queue is not ready\n",
(port_impl->type == MPP_PORT_INPUT) ?
("input") : ("output"));
goto RET;
}
curr = &queue->info[port_impl->status_curr];
next = &queue->info[port_impl->next_on_dequeue];
*task = NULL;
if (curr->count == 0) {
mpp_assert(list_empty(&curr->list));
return MPP_OK;
goto RET;
}
MppTaskImpl *task_impl = list_entry(curr->list.next, MppTaskImpl, list);
MppTask p = (MppTask)task_impl;
mpp_assert(!list_empty(&curr->list));
task_impl = list_entry(curr->list.next, MppTaskImpl, list);
p = (MppTask)task_impl;
check_mpp_task_name(p);
list_del_init(&task_impl->list);
curr->count--;
@@ -146,8 +219,11 @@ MPP_RET mpp_port_dequeue(MppPort port, MppTask *task)
task_impl->status = next->status;
*task = p;
ret = MPP_OK;
RET:
mpp_task_dbg_func("leave port %p ret %d\n", port, ret);
return MPP_OK;
return ret;
}
MPP_RET mpp_port_enqueue(MppPort port, MppTask task)
@@ -155,14 +231,28 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task)
MppTaskImpl *task_impl = (MppTaskImpl *)task;
MppPortImpl *port_impl = (MppPortImpl *)port;
MppTaskQueueImpl *queue = port_impl->queue;
MppTaskStatusInfo *curr = NULL;
MppTaskStatusInfo *next = NULL;
AutoMutex auto_lock(queue->lock);
MPP_RET ret = MPP_NOK;
mpp_task_dbg_func("enter port %p\n", port);
if (!queue->ready) {
mpp_err("try to enqueue when %s queue is not ready\n",
(port_impl->type == MPP_PORT_INPUT) ?
("input") : ("output"));
goto RET;
}
check_mpp_task_name(task);
mpp_assert(task_impl->queue == (MppTaskQueue *)queue);
mpp_assert(task_impl->status == port_impl->next_on_dequeue);
AutoMutex auto_lock(queue->lock);
MppTaskStatusInfo *curr = &queue->info[task_impl->status];
MppTaskStatusInfo *next = &queue->info[port_impl->next_on_enqueue];
curr = &queue->info[task_impl->status];
next = &queue->info[port_impl->next_on_enqueue];
list_del_init(&task_impl->list);
curr->count--;
@@ -170,7 +260,13 @@ MPP_RET mpp_port_enqueue(MppPort port, MppTask task)
next->count++;
task_impl->status = next->status;
return MPP_OK;
next->cond->signal();
mpp_task_dbg_func("signal port %p\n", next);
ret = MPP_OK;
RET:
mpp_task_dbg_func("leave port %p ret %d\n", port, ret);
return ret;
}
MPP_RET mpp_task_queue_init(MppTaskQueue *queue)
@@ -180,54 +276,71 @@ MPP_RET mpp_task_queue_init(MppTaskQueue *queue)
return MPP_ERR_NULL_PTR;
}
MPP_RET ret = MPP_NOK;
MppTaskQueueImpl *p = NULL;
MppTaskImpl *tasks = NULL;
Mutex *lock = NULL;
do {
Condition *cond[MPP_TASK_STATUS_BUTT] = { NULL };
RK_S32 i;
mpp_env_get_u32("mpp_task_debug", &mpp_task_debug, 0);
mpp_task_dbg_func("enter\n");
*queue = NULL;
p = mpp_calloc(MppTaskQueueImpl, 1);
if (NULL == p) {
mpp_err_f("malloc queue failed\n");
break;
goto RET;
}
lock = new Mutex();
if (NULL == lock) {
mpp_err_f("new lock failed\n");
break;;
cond[MPP_INPUT_PORT] = new Condition();
cond[MPP_INPUT_HOLD] = NULL;
cond[MPP_OUTPUT_PORT] = new Condition();
cond[MPP_OUTPUT_HOLD] = NULL;
if (NULL == cond[MPP_INPUT_PORT] ||
NULL == cond[MPP_OUTPUT_PORT]) {
mpp_err_f("new condition failed\n");
goto RET;
}
for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) {
INIT_LIST_HEAD(&p->info[i].list);
p->info[i].count = 0;
p->info[i].status = (MppTaskStatus)i;
p->info[i].cond = cond[i];
}
lock = new Mutex();
if (NULL == lock) {
mpp_err_f("new lock failed\n");
goto RET;
}
p->lock = lock;
p->tasks = tasks;
if (mpp_port_init(p, MPP_PORT_INPUT, &p->input))
break;
goto RET;
if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) {
mpp_port_deinit(p->input);
break;
goto RET;
}
ret = MPP_OK;
RET:
if (ret) {
if (lock)
delete lock;
MPP_FREE(cond[MPP_INPUT_PORT]);
MPP_FREE(cond[MPP_OUTPUT_PORT]);
MPP_FREE(p);
}
*queue = p;
return MPP_OK;
} while (0);
if (p)
mpp_free(p);
if (lock)
delete lock;
if (tasks)
mpp_free(tasks);
*queue = NULL;
return MPP_NOK;
mpp_task_dbg_func("leave ret %d queue %p\n", ret, p);
return ret;
}
MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
@@ -260,6 +373,7 @@ MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
list_add_tail(&tasks[i].list, &info->list);
info->count++;
}
impl->ready = 1;
return MPP_OK;
}
@@ -271,14 +385,11 @@ MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
}
MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue;
if (p->input) {
mpp_port_deinit(p->input);
p->input = NULL;
}
if (p->output) {
mpp_port_deinit(p->output);
p->output = NULL;
}
p->lock->lock();
p->ready = 0;
p->info[MPP_INPUT_PORT].cond->signal();
p->info[MPP_OUTPUT_PORT].cond->signal();
if (p->tasks) {
for (RK_S32 i = 0; i < p->task_count; i++) {
/* we must ensure that all task return to init status */
@@ -288,6 +399,16 @@ MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
}
mpp_free(p->tasks);
}
if (p->input) {
mpp_port_deinit(p->input);
p->input = NULL;
}
if (p->output) {
mpp_port_deinit(p->output);
p->output = NULL;
}
p->lock->unlock();
if (p->lock)
delete p->lock;
mpp_free(p);

View File

@@ -903,8 +903,12 @@ void *mpp_dec_advanced_thread(void *data)
mpp_buf_slot_ready(frame_slots);
}
if (slot_size != buffer_size) {
mpp_err_f("slot size %d is not equal to buffer size %d\n",
slot_size, buffer_size);
mpp_assert(slot_size == buffer_size);
}
}
mpp_buf_slot_set_prop(frame_slots, task_dec->output, SLOT_BUFFER, output_buffer);
@@ -943,6 +947,7 @@ void *mpp_dec_advanced_thread(void *data)
mpp_task = NULL;
// send finished task to output port
mpp_port_poll(output, MPP_POLL_BLOCK);
mpp_port_dequeue(output, &mpp_task);
mpp_task_meta_set_frame(mpp_task, KEY_OUTPUT_FRAME, frame);

View File

@@ -171,6 +171,7 @@ void *mpp_enc_control_thread(void *data)
mpp_task = NULL;
// send finished task to output port
mpp_port_poll(output, MPP_POLL_BLOCK);
mpp_port_dequeue(output, &mpp_task);
mpp_task_meta_set_packet(mpp_task, KEY_OUTPUT_PACKET, packet);

View File

@@ -183,8 +183,6 @@ VpuApiLegacy::VpuApiLegacy() :
init_ok(0),
frame_count(0),
set_eos(0),
block_input(0),
block_output(0),
fp(NULL),
fp_buf(NULL),
memGroup(NULL),
@@ -242,10 +240,15 @@ RK_S32 VpuApiLegacy::init(VpuCodecContext *ctx, RK_U8 *extraData, RK_U32 extra_s
}
if (CODEC_DECODER == ctx->codecType) {
block_input = 0;
block_output = 0;
type = MPP_CTX_DEC;
} else if (CODEC_ENCODER == ctx->codecType) {
MppPollType block = MPP_POLL_BLOCK;
/* setup input / output block mode */
ret = mpi->control(mpp_ctx, MPP_SET_INPUT_BLOCK, (MppParam)&block);
if (MPP_OK != ret)
mpp_err("mpi->control MPP_SET_INPUT_BLOCK failed\n");
if (memGroup == NULL) {
ret = mpp_buffer_group_get_internal(&memGroup, MPP_BUFFER_TYPE_ION);
if (MPP_OK != ret) {
@@ -253,24 +256,12 @@ RK_S32 VpuApiLegacy::init(VpuCodecContext *ctx, RK_U8 *extraData, RK_U32 extra_s
return ret;
}
}
block_input = 1;
block_output = 0;
type = MPP_CTX_ENC;
} else {
return MPP_ERR_VPU_CODEC_INIT;
}
/* setup input / output block mode */
ret = mpi->control(mpp_ctx, MPP_SET_INPUT_BLOCK, (MppParam)&block_input);
if (MPP_OK != ret) {
mpp_err("mpi->control MPP_SET_INPUT_BLOCK failed\n");
}
ret = mpi->control(mpp_ctx, MPP_SET_OUTPUT_BLOCK, (MppParam)&block_output);
if (MPP_OK != ret) {
mpp_err("mpi->control MPP_SET_OUTPUT_BLOCK failed\n");
}
ret = mpp_init(mpp_ctx, type, (MppCodingType)ctx->videoCoding);
if (ret) {
mpp_err_f(" init error. \n");
@@ -531,23 +522,21 @@ RK_S32 VpuApiLegacy::decode(VpuCodecContext *ctx, VideoPacket_t *pkt, DecoderOut
vpu_api_dbg_func("mpp import input fd %d output fd %d",
mpp_buffer_get_fd(str_buf), mpp_buffer_get_fd(pic_buf));
do {
ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp input poll failed\n");
goto DECODE_OUT;
}
ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task);
if (ret) {
mpp_err("mpp task input dequeue failed\n");
goto DECODE_OUT;
}
if (task == NULL) {
vpu_api_dbg_func("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!");
msleep(3);
} else
break;
} while (1);
mpp_task_meta_set_packet(task, KEY_INPUT_PACKET, packet);
mpp_task_meta_set_frame (task, KEY_OUTPUT_FRAME, mframe);
if (mpi != NULL) {
ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task);
if (ret) {
mpp_err("mpp task input enqueue failed\n");
@@ -557,7 +546,12 @@ RK_S32 VpuApiLegacy::decode(VpuCodecContext *ctx, VideoPacket_t *pkt, DecoderOut
pkt->size = 0;
task = NULL;
do {
ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp output poll failed\n");
goto DECODE_OUT;
}
ret = mpi->dequeue(mpp_ctx, MPP_PORT_OUTPUT, &task);
if (ret) {
mpp_err("ret %d mpp task output dequeue failed\n", ret);
@@ -578,13 +572,6 @@ RK_S32 VpuApiLegacy::decode(VpuCodecContext *ctx, VideoPacket_t *pkt, DecoderOut
goto DECODE_OUT;
}
task = NULL;
break;
}
msleep(3);
} while (1);
} else {
mpp_err("mpi pointer is NULL, failed!");
}
// copy decoded frame into output buffer, and set outpub frame size
@@ -700,16 +687,13 @@ RK_S32 VpuApiLegacy::decode_sendstream(VideoPacket_t *pkt)
vpu_api_dbg_input("input size %-6d flag %x pts %lld\n",
pkt->size, pkt->nFlags, pkt->pts);
do {
ret = mpi->decode_put_packet(mpp_ctx, mpkt);
if (ret == MPP_OK) {
pkt->size = 0;
break;
} else {
/* reduce cpu overhead here */
msleep(1);
}
} while (block_input);
mpp_packet_deinit(&mpkt);
@@ -896,23 +880,25 @@ RK_S32 VpuApiLegacy::encode(VpuCodecContext *ctx, EncInputStream_t *aEncInStrm,
vpu_api_dbg_func("mpp import input fd %d output fd %d",
mpp_buffer_get_fd(pic_buf), mpp_buffer_get_fd(str_buf));
do {
ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp input poll failed\n");
goto ENCODE_OUT;
}
ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task);
if (ret) {
mpp_err("mpp task input dequeue failed\n");
goto ENCODE_OUT;
}
if (task == NULL) {
vpu_api_dbg_func("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!");
msleep(3);
} else
break;
} while (1);
mpp_err("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!");
goto ENCODE_OUT;
}
mpp_task_meta_set_frame (task, KEY_INPUT_FRAME, frame);
mpp_task_meta_set_packet(task, KEY_OUTPUT_PACKET, packet);
if (mpi != NULL) {
ret = mpi->enqueue(mpp_ctx, MPP_PORT_INPUT, task);
if (ret) {
mpp_err("mpp task input enqueue failed\n");
@@ -920,13 +906,20 @@ RK_S32 VpuApiLegacy::encode(VpuCodecContext *ctx, EncInputStream_t *aEncInStrm,
}
task = NULL;
do {
ret = mpi->poll(mpp_ctx, MPP_PORT_OUTPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp output poll failed\n");
goto ENCODE_OUT;
}
ret = mpi->dequeue(mpp_ctx, MPP_PORT_OUTPUT, &task);
if (ret) {
mpp_err("ret %d mpp task output dequeue failed\n", ret);
goto ENCODE_OUT;
}
mpp_assert(task);
if (task) {
MppFrame frame_out = NULL;
MppFrame packet_out = NULL;
@@ -944,11 +937,17 @@ RK_S32 VpuApiLegacy::encode(VpuCodecContext *ctx, EncInputStream_t *aEncInStrm,
}
task = NULL;
ret = mpi->poll(mpp_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp input poll failed\n");
goto ENCODE_OUT;
}
// dequeue task from MPP_PORT_INPUT
ret = mpi->dequeue(mpp_ctx, MPP_PORT_INPUT, &task);
if (ret) {
mpp_log_f("failed to dequeue from input port ret %d\n", ret);
break;
goto ENCODE_OUT;
}
mpp_assert(task);
ret = mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame_out);
@@ -959,17 +958,10 @@ RK_S32 VpuApiLegacy::encode(VpuCodecContext *ctx, EncInputStream_t *aEncInStrm,
goto ENCODE_OUT;
}
task = NULL;
break;
}
msleep(3);
} while (1);
} else {
mpp_err("mpi pointer is NULL, failed!");
}
// copy encoded stream into output buffer, and set output stream size
if (packet != NULL) {
if (packet) {
RK_U32 eos = mpp_packet_get_eos(packet);
RK_S64 pts = mpp_packet_get_pts(packet);
RK_U32 flag = mpp_packet_get_flag(packet);

View File

@@ -73,9 +73,6 @@ private:
RK_U32 frame_count;
RK_U32 set_eos;
RK_U32 block_input;
RK_U32 block_output;
FILE *fp;
RK_U8 *fp_buf;

View File

@@ -285,6 +285,32 @@ static MPP_RET mpi_isp_get_frame(MppCtx ctx, MppFrame *frame)
return ret;
}
static MPP_RET mpi_poll(MppCtx ctx, MppPortType type, MppPollType timeout)
{
MPP_RET ret = MPP_NOK;
MpiImpl *p = (MpiImpl *)ctx;
mpi_dbg_func("enter ctx %p type %d timeout %d\n", ctx, type, timeout);
do {
ret = check_mpp_ctx(p);
if (ret)
break;;
if (type >= MPP_PORT_BUTT ||
timeout < MPP_POLL_BUTT ||
timeout > MPP_POLL_MAX) {
mpp_err_f("invalid input type %d timeout %d\n", type, timeout);
ret = MPP_ERR_UNKNOW;
break;
}
ret = p->ctx->poll(type, timeout);
} while (0);
mpi_dbg_func("leave ret %d\n", ret);
return ret;
}
static MPP_RET mpi_dequeue(MppCtx ctx, MppPortType type, MppTask *task)
{
MPP_RET ret = MPP_NOK;
@@ -381,6 +407,7 @@ static MppApi mpp_api = {
mpi_isp,
mpi_isp_put_frame,
mpi_isp_get_frame,
mpi_poll,
mpi_dequeue,
mpi_enqueue,
mpi_reset,

View File

@@ -50,8 +50,8 @@ Mpp::Mpp()
mOutputPort(NULL),
mInputTaskQueue(NULL),
mOutputTaskQueue(NULL),
mInputBlock(0),
mOutputBlock(0),
mInputBlock(MPP_POLL_NON_BLOCK),
mOutputBlock(MPP_POLL_NON_BLOCK),
mThreadCodec(NULL),
mThreadHal(NULL),
mDec(NULL),
@@ -60,7 +60,6 @@ Mpp::Mpp()
mCoding(MPP_VIDEO_CodingUnused),
mInitDone(0),
mMultiFrame(0),
mInputTask(NULL),
mStatus(0),
mParserFastMode(0),
mParserNeedSplit(0),
@@ -270,8 +269,9 @@ MPP_RET Mpp::get_frame(MppFrame *frame)
if (0 == mFrames->list_size()) {
mThreadCodec->signal();
if (mOutputBlock)
if (mOutputBlock == MPP_POLL_BLOCK)
mFrames->wait();
/* NOTE: this sleep is to avoid user's dead loop */
msleep(1);
}
@@ -302,49 +302,50 @@ MPP_RET Mpp::put_frame(MppFrame frame)
return MPP_NOK;
MPP_RET ret = MPP_NOK;
MppTask task = mInputTask;
MppTask task = NULL;
do {
if (NULL == task) {
ret = dequeue(MPP_PORT_INPUT, &task);
ret = poll(MPP_PORT_INPUT, mInputBlock);
if (ret) {
mpp_log_f("failed to dequeue from input port ret %d\n", ret);
break;
}
mpp_log_f("poll on set timeout %d ret %d\n", mInputBlock, ret);
goto RET;
}
/* FIXME: use wait to do block wait */
if (mInputBlock && NULL == task) {
msleep(2);
continue;
ret = dequeue(MPP_PORT_INPUT, &task);
if (ret || NULL == task) {
mpp_log_f("dequeue on set ret %d task %p\n", ret, task);
goto RET;
}
mpp_assert(task);
ret = mpp_task_meta_set_frame(task, KEY_INPUT_FRAME, frame);
if (ret) {
mpp_log_f("failed to set input frame to task ret %d\n", ret);
break;
mpp_log_f("set input frame to task ret %d\n", ret);
goto RET;
}
ret = enqueue(MPP_PORT_INPUT, task);
if (ret) {
mpp_log_f("failed to enqueue task to input port ret %d\n", ret);
break;
mpp_log_f("enqueue ret %d\n", ret);
goto RET;
}
if (mInputBlock) {
while (MPP_NOK == mpp_port_can_dequeue(mInputPort)) {
msleep(2);
ret = poll(MPP_PORT_INPUT, mInputBlock);
if (ret) {
mpp_log_f("poll on get timeout %d ret %d\n", mInputBlock, ret);
goto RET;
}
ret = dequeue(MPP_PORT_INPUT, &task);
if (ret) {
mpp_log_f("failed to dequeue from input port ret %d\n", ret);
break;
mpp_log_f("dequeue on get ret %d\n", ret);
goto RET;
}
if (mInputBlock != MPP_POLL_NON_BLOCK)
mpp_assert(task);
if (task) {
ret = mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame);
if (frame) {
mpp_frame_deinit(&frame);
@@ -352,11 +353,7 @@ MPP_RET Mpp::put_frame(MppFrame frame)
}
}
break;
} while (1);
mInputTask = task;
RET:
return ret;
}
@@ -368,31 +365,24 @@ MPP_RET Mpp::get_packet(MppPacket *packet)
MPP_RET ret = MPP_OK;
MppTask task = NULL;
do {
if (NULL == task) {
ret = dequeue(MPP_PORT_OUTPUT, &task);
ret = poll(MPP_PORT_OUTPUT, mOutputBlock);
if (ret) {
mpp_log_f("failed to dequeue from output port ret %d\n", ret);
break;
}
mpp_log_f("poll on get timeout %d ret %d\n", mOutputBlock, ret);
goto RET;
}
/* FIXME: use wait to do block wait */
if (NULL == task) {
if (mOutputBlock) {
msleep(2);
continue;
} else {
break;
}
ret = dequeue(MPP_PORT_OUTPUT, &task);
if (ret || NULL == task) {
mpp_log_f("dequeue on get ret %d task %p\n", ret, task);
goto RET;
}
mpp_assert(task);
ret = mpp_task_meta_get_packet(task, KEY_OUTPUT_PACKET, packet);
if (ret) {
mpp_log_f("failed to get output packet from task ret %d\n", ret);
break;
mpp_log_f("get output packet from task ret %d\n", ret);
goto RET;
}
mpp_assert(*packet);
@@ -401,12 +391,35 @@ MPP_RET Mpp::get_packet(MppPacket *packet)
mpp_log_f("pts %lld\n", mpp_packet_get_pts(*packet));
ret = enqueue(MPP_PORT_OUTPUT, task);
if (ret) {
mpp_log_f("failed to enqueue task to output port ret %d\n", ret);
if (ret)
mpp_log_f("enqueue on set ret %d\n", ret);
RET:
return ret;
}
break;
} while (1);
MPP_RET Mpp::poll(MppPortType type, MppPollType timeout)
{
if (!mInitDone)
return MPP_NOK;
MPP_RET ret = MPP_NOK;
AutoMutex autoLock(mPortLock);
MppTaskQueue port = NULL;
switch (type) {
case MPP_PORT_INPUT : {
port = mInputPort;
} break;
case MPP_PORT_OUTPUT : {
port = mOutputPort;
} break;
default : {
} break;
}
if (port)
ret = mpp_port_poll(port, timeout);
return ret;
}
@@ -419,6 +432,7 @@ MPP_RET Mpp::dequeue(MppPortType type, MppTask *task)
MPP_RET ret = MPP_NOK;
AutoMutex autoLock(mPortLock);
MppTaskQueue port = NULL;
switch (type) {
case MPP_PORT_INPUT : {
port = mInputPort;
@@ -444,6 +458,7 @@ MPP_RET Mpp::enqueue(MppPortType type, MppTask task)
MPP_RET ret = MPP_NOK;
AutoMutex autoLock(mPortLock);
MppTaskQueue port = NULL;
switch (type) {
case MPP_PORT_INPUT : {
port = mInputPort;
@@ -581,11 +596,11 @@ MPP_RET Mpp::control_mpp(MpiCmd cmd, MppParam param)
switch (cmd) {
case MPP_SET_INPUT_BLOCK: {
RK_U32 block = *((RK_U32 *)param);
MppPollType block = *((MppPollType *)param);
mInputBlock = block;
} break;
case MPP_SET_OUTPUT_BLOCK: {
RK_U32 block = *((RK_U32 *)param);
MppPollType block = *((MppPollType *)param);
mOutputBlock = block;
} break;
default : {

View File

@@ -71,6 +71,7 @@ public:
MPP_RET put_frame(MppFrame frame);
MPP_RET get_packet(MppPacket *packet);
MPP_RET poll(MppPortType type, MppPollType timeout);
MPP_RET dequeue(MppPortType type, MppTask *task);
MPP_RET enqueue(MppPortType type, MppTask task);
@@ -109,8 +110,8 @@ public:
MppTaskQueue mInputTaskQueue;
MppTaskQueue mOutputTaskQueue;
RK_U32 mInputBlock;
RK_U32 mOutputBlock;
MppPollType mInputBlock;
MppPollType mOutputBlock;
/*
* There are two threads for each decoder/encoder: codec thread and hal thread
*
@@ -135,9 +136,6 @@ private:
RK_U32 mInitDone;
RK_U32 mMultiFrame;
// task for put_frame / put_packet
MppTask mInputTask;
RK_U32 mStatus;
/* decoder paramter before init */

View File

@@ -266,20 +266,19 @@ static int decode_advanced(MpiDecLoopData *data)
if (pkt_eos)
mpp_packet_set_eos(packet);
do {
ret = mpi->poll(ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp input poll failed\n");
return ret;
}
ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task); /* input queue */
if (ret) {
mpp_err("mpp task input dequeue failed\n");
return ret;
}
if (task == NULL) {
mpp_log("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!");
msleep(3);
} else {
break;
}
} while (1);
mpp_assert(task);
mpp_task_meta_set_packet(task, KEY_INPUT_PACKET, packet);
mpp_task_meta_set_frame (task, KEY_OUTPUT_FRAME, frame);
@@ -290,15 +289,21 @@ static int decode_advanced(MpiDecLoopData *data)
return ret;
}
msleep(20);
/* poll and wait here */
ret = mpi->poll(ctx, MPP_PORT_OUTPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp output poll failed\n");
return ret;
}
do {
ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task); /* output queue */
if (ret) {
mpp_err("mpp task output dequeue failed\n");
return ret;
}
mpp_assert(task);
if (task) {
MppFrame frame_out = NULL;
mpp_task_meta_get_frame(task, KEY_OUTPUT_FRAME, &frame_out);
@@ -322,14 +327,11 @@ static int decode_advanced(MpiDecLoopData *data)
mpp_log("found eos frame\n");
}
ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task); /* output queue */
if (ret) {
/* output queue */
ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task);
if (ret)
mpp_err("mpp task output enqueue failed\n");
return ret;
}
break;
}
} while (1);
return ret;
}
@@ -419,7 +421,7 @@ int mpi_dec_test_decode(MpiDecTestCmd *cmd)
goto MPP_TEST_OUT;
}
ret = mpp_buffer_get(data.frm_grp, &frm_buf, width * height * 3 / 2);
ret = mpp_buffer_get(data.frm_grp, &frm_buf, width * height * 2);
if (ret) {
mpp_err("failed to get buffer for input frame ret %d\n", ret);
goto MPP_TEST_OUT;

View File

@@ -461,26 +461,25 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
mpp_packet_init_with_buffer(&packet, pkt_buf_out);
do {
ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task);
ret = mpi->poll(ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp task input dequeue failed\n");
mpp_err("mpp task input poll failed ret %d\n", ret);
goto MPP_TEST_OUT;
}
if (task == NULL) {
mpp_log("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!");
msleep(3);
} else {
ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task);
if (ret || NULL == task) {
mpp_err("mpp task input dequeue failed ret %d task %p\n", ret, task);
goto MPP_TEST_OUT;
}
if (task) {
MppFrame frame_out = NULL;
mpp_task_meta_get_frame(task, KEY_INPUT_FRAME, &frame_out);
if (frame_out)
mpp_assert(frame_out == frame);
break;
}
} while (1);
mpp_task_meta_set_frame (task, KEY_INPUT_FRAME, frame);
mpp_task_meta_set_packet(task, KEY_OUTPUT_PACKET, packet);
@@ -513,12 +512,15 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
goto MPP_TEST_OUT;
}
msleep(20);
do {
ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task);
ret = mpi->poll(ctx, MPP_PORT_OUTPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp task output dequeue failed\n");
mpp_err("mpp task output poll failed ret %d\n", ret);
goto MPP_TEST_OUT;
}
ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task);
if (ret || NULL == task) {
mpp_err("mpp task output dequeue failed ret %d task %p\n", ret, task);
goto MPP_TEST_OUT;
}
@@ -554,9 +556,7 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
mpp_err("mpp task output enqueue failed\n");
goto MPP_TEST_OUT;
}
break;
}
} while (1);
if (num_frames && frame_count >= num_frames) {
mpp_log_f("encode max %d frames", frame_count);

View File

@@ -821,25 +821,25 @@ static MPP_RET mpi_rc_codec(MpiRcTestCtx *ctx)
mpp_packet_init_with_buffer(&packet, pkt_buf_out);
do {
ret = enc_mpi->poll(enc_ctx, MPP_PORT_INPUT, MPP_POLL_BLOCK);
if (ret) {
mpp_err("mpp input poll failed\n");
goto MPP_TEST_OUT;
}
ret = enc_mpi->dequeue(enc_ctx, MPP_PORT_INPUT, &enc_task);
if (ret) {
mpp_err("mpp task input dequeue failed\n");
goto MPP_TEST_OUT;
}
if (enc_task == NULL) {
mpp_log("mpi dequeue from MPP_PORT_INPUT fail, task equal with NULL!");
msleep(3);
} else {
MppFrame tmp_frm = NULL;
mpp_assert(enc_task);
{
MppFrame tmp_frm = NULL;
mpp_task_meta_get_frame(enc_task, KEY_INPUT_FRAME, &tmp_frm);
if (tmp_frm)
mpp_assert(tmp_frm == frame_in);
break;
}
} while (1);
mpp_task_meta_set_frame (enc_task, KEY_INPUT_FRAME, frame_in);
mpp_task_meta_set_packet(enc_task, KEY_OUTPUT_PACKET, packet);