[mpp_task]: add advanced task flow to mpp, mpi_enc_test can work without error

git-svn-id: https://10.10.10.66:8443/svn/MediaProcessPlatform/trunk/mpp@1035 6e48237b-75ef-9749-8fc9-41990f28c85a
This commit is contained in:
ChenHengming
2016-07-16 10:13:35 +00:00
parent 947da74b5d
commit ac9505c16c
10 changed files with 450 additions and 215 deletions

View File

@@ -42,7 +42,14 @@
*/
/*
* mpp has two ports: input and output
* One mpp task queue has two ports: input and output
*
* The whole picture is:
* Top layer mpp has two ports: mpp_input_port and mpp_output_port
* But internally these two ports belongs to two task queue.
* The mpp_input_port is the mpp_input_task_queue's input port.
* The mpp_output_port is the mpp_output_task_queue's output port.
*
* Each port uses its task queue to communication
*/
typedef enum {
@@ -175,11 +182,46 @@ typedef enum {
*/
/* NOTE: use index rather then handle to descripbe task */
typedef void* MppTask;
typedef void* MppPort;
typedef void* MppTaskQueue;
#ifdef __cplusplus
extern "C" {
#endif
/*
* Mpp task queue function:
*
* mpp_task_queue_init - create task queue structure
* mpp_task_queue_deinit - destory task queue structure
* mpp_task_queue_get_port - return input or output port of task queue
*
* Typical work flow, task mpp_dec for example:
*
* 1. Mpp layer creates one task queue in order to connect mpp input and mpp_dec input.
* 2. Mpp layer setups the task count in task queue input port.
* 3. Get input port from the task queue and assign to mpp input as mpp_input_port.
* 4. Get output port from the task queue and assign to mpp_dec input as dec_input_port.
* 5. Let the loop start.
* a. mpi user will dequeue task from mpp_input_port.
* b. mpi user will setup task.
* c. mpi user will enqueue task back to mpp_input_port.
* d. task will automatically transfer to dec_input_port.
* e. mpp_dec will dequeue task from dec_input_port.
* f. mpp_dec will process task.
* g. mpp_dec will enqueue task back to dec_input_port.
* h. task will automatically transfer to mpp_input_port.
* 6. Stop the loop. All tasks must be return to input port with idle status.
* 6. Mpp layer destory the task queue.
*/
MPP_RET mpp_task_queue_init(MppTaskQueue *queue);
MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count);
MPP_RET mpp_task_queue_deinit(MppTaskQueue queue);
MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type);
MPP_RET mpp_port_dequeue(MppPort port, MppTask *task);
MPP_RET mpp_port_enqueue(MppPort port, MppTask task);
MPP_RET mpp_task_meta_set_s32(MppTask task, MppMetaKey key, RK_S32 val);
MPP_RET mpp_task_meta_set_s64(MppTask task, MppMetaKey key, RK_S64 val);
MPP_RET mpp_task_meta_set_ptr(MppTask task, MppMetaKey key, void *val);

View File

@@ -52,19 +52,17 @@
*
*/
typedef enum MppTaskStatus_e {
MPP_EXTERNAL_QUEUE, /* in external queue and ready for external dequeue */
MPP_EXTERNAL_HOLD, /* dequeued and hold by external user, user will config */
MPP_INTERNAL_QUEUE, /* in mpp internal work queue and ready for mpp dequeue */
MPP_INTERNAL_HOLD, /* dequeued and hold by mpp internal worker, mpp is processing */
MPP_TASK_BUTT,
MPP_INPUT_PORT, /* in external queue and ready for external dequeue */
MPP_INPUT_HOLD, /* dequeued and hold by external user, user will config */
MPP_OUTPUT_PORT, /* in mpp internal work queue and ready for mpp dequeue */
MPP_OUTPUT_HOLD, /* dequeued and hold by mpp internal worker, mpp is processing */
MPP_TASK_STATUS_BUTT,
} MppTaskStatus;
typedef void* MppPort;
typedef struct MppTaskImpl_t {
const char *name;
struct list_head list;
MppPort *port;
MppTaskQueue *queue;
RK_S32 index;
MppTaskStatus status;
@@ -77,22 +75,6 @@ extern "C" {
MPP_RET check_mpp_task_name(MppTask task);
/*
* mpp_port_init:
* initialize port with task count and initial status
* group - return port pointer
* type - initial queue for all tasks
* task_count - total task count for this task group
*/
MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count);
MPP_RET mpp_port_deinit(MppPort port);
MPP_RET mpp_port_can_dequeue(MppPort port);
MPP_RET mpp_port_can_enqueue(MppPort port);
MPP_RET mpp_port_dequeue(MppPort port, MppTask *task);
MPP_RET mpp_port_enqueue(MppPort port, MppTask task);
#ifdef __cplusplus
}
#endif

View File

@@ -25,15 +25,32 @@
#define MAX_TASK_COUNT 8
typedef struct MppTaskGroupImpl_t {
MppPortType type;
RK_S32 task_count;
typedef struct MppTaskStatusInfo_t {
struct list_head list;
RK_S32 count;
MppTaskStatus status;
} MppTaskStatusInfo;
typedef struct MppTaskQueueImpl_t {
Mutex *lock;
RK_S32 task_count;
// two ports inside of task queue
MppPort input;
MppPort output;
MppTaskImpl *tasks;
struct list_head list[MPP_TASK_BUTT];
RK_S32 count[MPP_TASK_BUTT];
MppTaskStatusInfo info[MPP_TASK_STATUS_BUTT];
} MppTaskQueueImpl;
typedef struct MppPortImpl_t {
MppPortType type;
MppTaskQueueImpl *queue;
MppTaskStatus status_curr;
MppTaskStatus next_on_dequeue;
MppTaskStatus next_on_enqueue;
} MppPortImpl;
static const char *module_name = MODULE_TAG;
@@ -53,22 +70,109 @@ MPP_RET check_mpp_task_name(MppTask task)
return MPP_NOK;
}
MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count)
static MPP_RET mpp_port_init(MppTaskQueueImpl *queue, MppPortType type, MppPort *port)
{
if (NULL == port || type >= MPP_PORT_BUTT || task_count > MAX_TASK_COUNT) {
mpp_err_f("invalid input port %p type %d count %d\n", port, type, task_count);
return MPP_ERR_UNKNOW;
MppPortImpl *impl = mpp_malloc(MppPortImpl, 1);
if (NULL == impl) {
mpp_err_f("failed to malloc MppPort type %d\n", type);
return MPP_ERR_MALLOC;
}
MppPortImpl *p = NULL;
impl->type = type;
impl->queue = queue;
if (MPP_PORT_INPUT == type) {
impl->status_curr = MPP_INPUT_PORT;
impl->next_on_dequeue = MPP_INPUT_HOLD;
impl->next_on_enqueue = MPP_OUTPUT_PORT;
} else {
impl->status_curr = MPP_OUTPUT_PORT;
impl->next_on_dequeue = MPP_OUTPUT_HOLD;
impl->next_on_enqueue = MPP_INPUT_PORT;
}
*port = (MppPort *)impl;
return MPP_OK;
}
static MPP_RET mpp_port_deinit(MppPort port)
{
mpp_free(port);
return MPP_OK;
}
MPP_RET mpp_port_dequeue(MppPort port, MppTask *task)
{
MppPortImpl *port_impl = (MppPortImpl *)port;
MppTaskQueueImpl *queue = port_impl->queue;
AutoMutex auto_lock(queue->lock);
MppTaskStatusInfo *curr = &queue->info[port_impl->status_curr];
MppTaskStatusInfo *next = &queue->info[port_impl->next_on_dequeue];
*task = NULL;
if (curr->count == 0) {
mpp_assert(list_empty(&curr->list));
return MPP_OK;
}
MppTaskImpl *task_impl = list_entry(curr->list.next, MppTaskImpl, list);
MppTask p = (MppTask)task_impl;
check_mpp_task_name(p);
list_del_init(&task_impl->list);
curr->count--;
mpp_assert(curr->count >= 0);
list_add_tail(&task_impl->list, &next->list);
next->count++;
task_impl->status = next->status;
*task = p;
return MPP_OK;
}
MPP_RET mpp_port_enqueue(MppPort port, MppTask task)
{
MppTaskImpl *task_impl = (MppTaskImpl *)task;
MppPortImpl *port_impl = (MppPortImpl *)port;
MppTaskQueueImpl *queue = port_impl->queue;
check_mpp_task_name(task);
mpp_assert(task_impl->queue == (MppTaskQueue *)queue);
mpp_assert(task_impl->status == port_impl->next_on_dequeue);
AutoMutex auto_lock(queue->lock);
MppTaskStatusInfo *curr = &queue->info[task_impl->status];
MppTaskStatusInfo *next = &queue->info[port_impl->next_on_enqueue];
list_del_init(&task_impl->list);
curr->count--;
list_add_tail(&task_impl->list, &next->list);
next->count++;
task_impl->status = next->status;
return MPP_OK;
}
MPP_RET mpp_task_queue_init(MppTaskQueue *queue)
{
if (NULL == queue) {
mpp_err_f("invalid NULL input\n");
return MPP_ERR_NULL_PTR;
}
MppTaskQueueImpl *p = NULL;
MppTaskImpl *tasks = NULL;
Mutex *lock = NULL;
MppTaskStatus status = (type == MPP_PORT_INPUT) ? (MPP_EXTERNAL_HOLD) : (MPP_INTERNAL_HOLD);
do {
p = mpp_calloc(MppPortImpl, 1);
RK_S32 i;
p = mpp_calloc(MppTaskQueueImpl, 1);
if (NULL == p) {
mpp_err_f("malloc port failed\n");
mpp_err_f("malloc queue failed\n");
break;
}
lock = new Mutex();
@@ -76,30 +180,25 @@ MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count)
mpp_err_f("new lock failed\n");
break;;
}
tasks = mpp_calloc(MppTaskImpl, task_count);
if (NULL == tasks) {
mpp_err_f("malloc tasks list failed\n");
break;;
for (i = 0; i < MPP_TASK_STATUS_BUTT; i++) {
INIT_LIST_HEAD(&p->info[i].list);
p->info[i].count = 0;
p->info[i].status = (MppTaskStatus)i;
}
p->type = type;
p->task_count = task_count;
p->lock = lock;
p->tasks = tasks;
for (RK_U32 i = 0; i < MPP_TASK_BUTT; i++)
INIT_LIST_HEAD(&p->list[i]);
if (mpp_port_init(p, MPP_PORT_INPUT, &p->input))
break;
for (RK_S32 i = 0; i < task_count; i++) {
setup_mpp_task_name(&tasks[i]);
INIT_LIST_HEAD(&tasks[i].list);
tasks[i].index = i;
tasks[i].port = port;
tasks[i].status = status;
list_add_tail(&tasks[i].list, &p->list[status]);
p->count[status]++;
if (mpp_port_init(p, MPP_PORT_OUTPUT, &p->output)) {
mpp_port_deinit(p->input);
break;
}
*port = p;
*queue = p;
return MPP_OK;
} while (0);
@@ -110,133 +209,79 @@ MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count)
if (tasks)
mpp_free(tasks);
*port = NULL;
*queue = NULL;
return MPP_NOK;
}
MPP_RET mpp_port_deinit(MppPort port)
MPP_RET mpp_task_queue_setup(MppTaskQueue queue, RK_S32 task_count)
{
if (NULL == port) {
mpp_err_f("found NULL input port\n");
MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
AutoMutex auto_lock(impl->lock);
// NOTE: queue can only be setup once
mpp_assert(impl->tasks == NULL);
mpp_assert(impl->task_count == 0);
MppTaskImpl *tasks = mpp_calloc(MppTaskImpl, task_count);
if (NULL == tasks) {
mpp_err_f("malloc tasks list failed\n");
return MPP_ERR_MALLOC;
}
impl->tasks = tasks;
impl->task_count = task_count;
MppTaskStatusInfo *info = &impl->info[MPP_INPUT_PORT];
for (RK_S32 i = 0; i < task_count; i++) {
setup_mpp_task_name(&tasks[i]);
INIT_LIST_HEAD(&tasks[i].list);
tasks[i].index = i;
tasks[i].queue = (MppTaskQueue *)queue;
tasks[i].status = MPP_INPUT_PORT;
mpp_meta_get(&tasks[i].meta);
list_add_tail(&tasks[i].list, &info->list);
info->count++;
}
return MPP_OK;
}
MPP_RET mpp_task_queue_deinit(MppTaskQueue queue)
{
if (NULL == queue) {
mpp_err_f("found NULL input queue\n");
return MPP_ERR_NULL_PTR;
}
MppPortImpl *p = (MppPortImpl *)port;
if (p->tasks)
MppTaskQueueImpl *p = (MppTaskQueueImpl *)queue;
if (p->input) {
mpp_port_deinit(p->input);
p->input = NULL;
}
if (p->output) {
mpp_port_deinit(p->output);
p->output = NULL;
}
if (p->tasks) {
for (RK_S32 i = 0; i < p->task_count; i++) {
mpp_meta_put(p->tasks[i].meta);
}
mpp_free(p->tasks);
}
if (p->lock)
delete p->lock;
mpp_free(p);
return MPP_OK;
}
MPP_RET mpp_port_can_dequeue(MppPort port)
MppPort mpp_task_queue_get_port(MppTaskQueue queue, MppPortType type)
{
if (NULL == port) {
mpp_err_f("invalid input port %p\n", port);
return MPP_ERR_NULL_PTR;
}
MppPortImpl *p = (MppPortImpl *)port;
AutoMutex auto_lock(p->lock);
MppTaskStatus status_curr;
MppTaskStatus status_next;
return MPP_OK;
}
MPP_RET mpp_port_can_enqueue(MppPort port)
{
return MPP_OK;
}
static MppTaskImpl* mpp_task_get_by_status(MppPortImpl *p, MppTaskStatus status)
{
struct list_head *list = &p->list[status];
if (list_empty(list))
if (NULL == queue || type >= MPP_PORT_BUTT) {
mpp_err_f("invalid input queue %p type %d\n", queue, type);
return NULL;
MppTaskImpl *task = list_entry(list->next, MppTaskImpl, list);
mpp_assert(task->status == status);
list_del_init(&task->list);
return task;
}
static MPP_RET mpp_task_put_by_status(MppPortImpl *p, MppTaskStatus status, MppTaskImpl* task)
{
task->status = status;
list_add_tail(&task->list, &p->list[status]);
return MPP_OK;
}
MPP_RET mpp_port_dequeue(MppPort port, MppTask *task)
{
if (NULL == port || NULL == task) {
mpp_err_f("invalid input port %p task %d\n", port, task);
return MPP_ERR_UNKNOW;
}
MppPortImpl *p = (MppPortImpl *)port;
AutoMutex auto_lock(p->lock);
MppTaskStatus status_curr;
MppTaskStatus status_next;
switch (p->type) {
case MPP_PORT_INPUT : {
status_curr = MPP_EXTERNAL_QUEUE;
status_next = MPP_EXTERNAL_HOLD;
} break;
case MPP_PORT_OUTPUT : {
status_curr = MPP_INTERNAL_QUEUE;
status_next = MPP_INTERNAL_HOLD;
} break;
default : {
mpp_err_f("invalid queue type: %d\n", p->type);
return MPP_NOK;
} break;
}
MppTaskImpl *task_op = mpp_task_get_by_status(p, status_curr);
if (NULL == task_op)
return MPP_NOK;
mpp_task_put_by_status(p, status_next, task_op);
*task = (MppTask)task_op;
return MPP_OK;
}
MPP_RET mpp_port_enqueue(MppPort port, MppTask task)
{
if (NULL == port || NULL == task) {
mpp_err_f("invalid input port %p task %d\n", port, task);
return MPP_ERR_UNKNOW;
}
MppPortImpl *p = (MppPortImpl *)port;
AutoMutex auto_lock(p->lock);
MppTaskStatus status_curr;
MppTaskStatus status_next;
switch (p->type) {
case MPP_PORT_INPUT : {
status_curr = MPP_INTERNAL_HOLD;
status_next = MPP_EXTERNAL_QUEUE;
} break;
case MPP_PORT_OUTPUT : {
status_curr = MPP_EXTERNAL_HOLD;
status_next = MPP_INTERNAL_QUEUE;
} break;
default : {
mpp_err_f("invalid queue type: %d\n", p->type);
return MPP_NOK;
} break;
}
MppTaskImpl *task_op = (MppTaskImpl *)task;
mpp_assert(task_op->status == status_curr);
list_del_init(&task_op->list);
mpp_task_put_by_status(p, status_next, task_op);
return MPP_OK;
MppTaskQueueImpl *impl = (MppTaskQueueImpl *)queue;
return (type == MPP_PORT_INPUT) ? (impl->input) : (impl->output);
}

View File

@@ -17,12 +17,19 @@
#ifndef __MPP_ENC_H__
#define __MPP_ENC_H__
#include "rk_mpi.h"
#include "mpp_hal.h"
typedef struct MppEnc_t MppEnc;
struct MppEnc_t {
MppCodingType coding;
MppHal hal;
// common resource
MppBufSlots frame_slots;
MppBufSlots packet_slots;
HalTaskGroup tasks;
};
#ifdef __cplusplus

View File

@@ -29,26 +29,42 @@ void *mpp_enc_control_thread(void *data)
{
Mpp *mpp = (Mpp*)data;
MppThread *thd_enc = mpp->mThreadCodec;
mpp_list *packets = mpp->mPackets;
mpp_list *frames = mpp->mFrames;
MppFrameImpl frame;
MppPacket packet;
size_t size = SZ_1M;
char *buf = mpp_malloc(char, size);
MppPort input = mpp_task_queue_get_port(mpp->mInputTaskQueue, MPP_PORT_OUTPUT);
MppPort output = mpp_task_queue_get_port(mpp->mOutputTaskQueue, MPP_PORT_INPUT);
MppTask task = NULL;
MPP_RET ret = MPP_OK;
while (MPP_THREAD_RUNNING == thd_enc->get_status()) {
AutoMutex auto_lock(frames->mutex());
if (frames->list_size()) {
frames->del_at_head(&frame, sizeof(frame));
thd_enc->lock();
ret = mpp_port_dequeue(input, &task);
if (ret || NULL == task)
thd_enc->wait();
thd_enc->unlock();
mpp_packet_init(&packet, buf, size);
packets->lock();
packets->add_at_tail(&packet, sizeof(packet));
packets->signal();
packets->unlock();
if (task) {
MppFrame frame = NULL;
MppPacket packet = NULL;
// task process here
// enqueue task back to input input
mpp_task_meta_get_frame (task, MPP_META_KEY_INPUT_FRM, &frame);
mpp_task_meta_get_packet(task, MPP_META_KEY_OUTPUT_PKT, &packet);
mpp_port_enqueue(input, task);
task = NULL;
// send finished task to output port
mpp_port_dequeue(output, &task);
mpp_task_meta_set_frame(task, MPP_META_KEY_INPUT_FRM, frame);
mpp_task_meta_set_packet(task, MPP_META_KEY_OUTPUT_PKT, packet);
// setup output task here
mpp_port_enqueue(output, task);
task = NULL;
}
}
mpp_free(buf);
return NULL;
}
@@ -123,7 +139,29 @@ MPP_RET mpp_enc_init(MppEnc **enc, MppCodingType coding)
return MPP_ERR_NULL_PTR;
}
MPP_RET ret = MPP_NOK;
MppHal hal = NULL;
MppHalCfg hal_cfg = {
MPP_CTX_ENC,
coding,
HAL_MODE_LIBVPU,
HAL_RKVDEC,
NULL,
NULL,
NULL,
2,
0,
NULL,
};
ret = mpp_hal_init(&hal, &hal_cfg);
if (ret) {
mpp_err_f("could not init hal\n");
}
p->coding = coding;
*enc = p;
return MPP_OK;
@@ -136,6 +174,10 @@ MPP_RET mpp_enc_deinit(MppEnc *enc)
return MPP_ERR_NULL_PTR;
}
MPP_RET ret = mpp_hal_deinit(enc->hal);
if (ret) {
mpp_err_f("mpp enc hal deinit failed\n");
}
mpp_free(enc);
return MPP_OK;
}

View File

@@ -43,6 +43,7 @@ static MppCodingTypeInfo support_list[] = {
{ MPP_CTX_DEC, MPP_VIDEO_CodingVP8, "dec", "vp8", },
{ MPP_CTX_DEC, MPP_VIDEO_CodingVP9, "dec", "VP9", },
{ MPP_CTX_DEC, MPP_VIDEO_CodingAVS, "dec", "avs+", },
{ MPP_CTX_ENC, MPP_VIDEO_CodingAVC, "enc", "h.264/AVC", },
};
#define check_mpp_ctx(ctx) _check_mpp_ctx(ctx, __FUNCTION__)
@@ -222,6 +223,8 @@ static MPP_RET mpi_dequeue(MppCtx ctx, MppPortType type, MppTask *task)
return MPP_ERR_UNKNOW;
}
ret = p->ctx->dequeue(type, task);
MPI_FUNCTION_LEAVE();
return ret;
}
@@ -235,11 +238,13 @@ static MPP_RET mpi_enqueue(MppCtx ctx, MppPortType type, MppTask task)
if (ret)
return ret;
if (type >= MPP_PORT_BUTT || task < 0) {
mpp_err_f("invalid input type %d task %d\n", type, task);
if (type >= MPP_PORT_BUTT || NULL == task) {
mpp_err_f("invalid input type %d task %p\n", type, task);
return MPP_ERR_UNKNOW;
}
ret = p->ctx->enqueue(type, task);
MPI_FUNCTION_LEAVE();
return ret;
}

View File

@@ -25,9 +25,9 @@
#include "mpp_dec.h"
#include "mpp_enc.h"
#include "mpp_hal.h"
#include "mpp_task_impl.h"
#include "mpp_buffer_impl.h"
#include "mpp_frame_impl.h"
#include "mpp_packet.h"
#include "mpp_packet_impl.h"
#define MPP_TEST_FRAME_SIZE SZ_1M
@@ -45,6 +45,10 @@ Mpp::Mpp()
mTaskGetCount(0),
mPacketGroup(NULL),
mFrameGroup(NULL),
mInputPort(NULL),
mOutputPort(NULL),
mInputTaskQueue(NULL),
mOutputTaskQueue(NULL),
mThreadCodec(NULL),
mThreadHal(NULL),
mDec(NULL),
@@ -91,7 +95,6 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding)
mpp_buffer_group_get_internal(&mPacketGroup, MPP_BUFFER_TYPE_ION);
mpp_buffer_group_limit_config(mPacketGroup, 0, 3);
} break;
case MPP_CTX_ENC : {
mFrames = new mpp_list((node_destructor)NULL);
@@ -110,6 +113,14 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding)
} break;
}
mpp_task_queue_init(&mInputTaskQueue);
mpp_task_queue_init(&mOutputTaskQueue);
mpp_task_queue_setup(mInputTaskQueue, 4);
mpp_task_queue_setup(mOutputTaskQueue, 4);
mInputPort = mpp_task_queue_get_port(mInputTaskQueue, MPP_PORT_INPUT);
mOutputPort = mpp_task_queue_get_port(mOutputTaskQueue, MPP_PORT_OUTPUT);
if (mFrames && mPackets &&
(mDec || mEnc) &&
mThreadCodec && mThreadHal &&
@@ -150,6 +161,19 @@ void Mpp::clear()
delete mThreadHal;
mThreadHal = NULL;
}
if (mInputTaskQueue) {
mpp_task_queue_deinit(mInputTaskQueue);
mInputTaskQueue = NULL;
}
if (mOutputTaskQueue) {
mpp_task_queue_deinit(mOutputTaskQueue);
mOutputTaskQueue = NULL;
}
mInputPort = NULL;
mOutputPort = NULL;
if (mDec || mEnc) {
if (mType == MPP_CTX_DEC) {
mpp_dec_deinit(mDec);
@@ -273,6 +297,63 @@ MPP_RET Mpp::get_packet(MppPacket *packet)
return MPP_OK;
}
MPP_RET Mpp::dequeue(MppPortType type, MppTask *task)
{
if (!mInitDone)
return MPP_NOK;
MPP_RET ret = MPP_NOK;
AutoMutex autoLock(mPortLock);
MppTaskQueue port = NULL;
switch (type) {
case MPP_PORT_INPUT : {
port = mInputPort;
} break;
case MPP_PORT_OUTPUT : {
port = mOutputPort;
} break;
default : {
} break;
}
if (port)
ret = mpp_port_dequeue(port, task);
return ret;
}
MPP_RET Mpp::enqueue(MppPortType type, MppTask task)
{
if (!mInitDone)
return MPP_NOK;
MPP_RET ret = MPP_NOK;
AutoMutex autoLock(mPortLock);
MppTaskQueue port = NULL;
switch (type) {
case MPP_PORT_INPUT : {
port = mInputPort;
} break;
case MPP_PORT_OUTPUT : {
port = mOutputPort;
} break;
default : {
} break;
}
if (port) {
mThreadCodec->lock();
ret = mpp_port_enqueue(port, task);
if (MPP_OK == ret) {
// if enqueue success wait up thread
mThreadCodec->signal();
}
mThreadCodec->unlock();
}
return ret;
}
MPP_RET Mpp::control(MpiCmd cmd, MppParam param)
{
switch (cmd) {

View File

@@ -20,6 +20,7 @@
#include "mpp_list.h"
#include "mpp_dec.h"
#include "mpp_enc.h"
#include "mpp_task.h"
#define MPP_DBG_FUNCTION (0x00000001)
#define MPP_DBG_PACKET (0x00000002)
@@ -70,6 +71,9 @@ public:
MPP_RET put_frame(MppFrame frame);
MPP_RET get_packet(MppPacket *packet);
MPP_RET dequeue(MppPortType type, MppTask *task);
MPP_RET enqueue(MppPortType type, MppTask task);
MPP_RET reset();
MPP_RET control(MpiCmd cmd, MppParam param);
@@ -94,6 +98,16 @@ public:
MppBufferGroup mPacketGroup;
MppBufferGroup mFrameGroup;
/*
* Mpp task queue for advance task mode
*/
Mutex mPortLock;
MppPort mInputPort;
MppPort mOutputPort;
MppTaskQueue mInputTaskQueue;
MppTaskQueue mOutputTaskQueue;
/*
* There are two threads for each decoder/encoder: codec thread and hal thread
*

View File

@@ -40,3 +40,6 @@ add_mpp_test(mpi)
# mpi decoder unit test
add_mpp_test(mpi_dec)
# mpi encoder unit test
add_mpp_test(mpi_enc)

View File

@@ -75,9 +75,6 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
MppBuffer frm_buf[MPI_ENC_IO_COUNT] = { NULL };
MppBuffer pkt_buf[MPI_ENC_IO_COUNT] = { NULL };
MpiCmd mpi_cmd = MPP_CMD_BASE;
MppParam param = NULL;
// paramter for resource malloc
RK_U32 width = cmd->width;
RK_U32 height = cmd->height;
@@ -162,6 +159,7 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
mpp_frame_set_hor_stride(frame, hor_stride);
mpp_frame_set_ver_stride(frame, ver_stride);
i = 0;
while (!pkt_eos) {
MppTask task = NULL;
RK_S32 index = i++;
@@ -176,12 +174,13 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
if (read_size != frame_size || feof(fp_input)) {
mpp_log("found last frame\n");
frm_eos = 1;
break;
}
mpp_frame_set_buffer(frame, frm_buf_in);
mpp_frame_set_eos(frame, frm_eos);
mpp_packet_init_with_buffer(packet, pkt_buf_out);
mpp_packet_init_with_buffer(&packet, pkt_buf_out);
ret = mpi->dequeue(ctx, MPP_PORT_INPUT, &task);
if (ret) {
@@ -198,8 +197,9 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
goto MPP_TEST_OUT;
}
msleep(200);
msleep(20);
do {
ret = mpi->dequeue(ctx, MPP_PORT_OUTPUT, &task);
if (ret) {
mpp_err("mpp task output dequeue failed\n");
@@ -207,23 +207,32 @@ int mpi_enc_test(MpiEncTestCmd *cmd)
}
if (task) {
mpp_task_meta_get_frame (task, MPP_META_KEY_INPUT_FRM, &frame);
mpp_task_meta_get_packet(task, MPP_META_KEY_OUTPUT_PKT, &packet);
MppFrame frame_out = NULL;
MppFrame packet_out = NULL;
mpp_task_meta_get_frame (task, MPP_META_KEY_INPUT_FRM, &frame_out);
mpp_task_meta_get_packet(task, MPP_META_KEY_OUTPUT_PKT, &packet_out);
mpp_assert(packet_out == packet);
mpp_assert(frame_out == frame);
if (packet) {
// write packet to file here
{
void *ptr = mpp_packet_get_pos(packet);
size_t len = mpp_packet_get_length(packet);
fwrite(ptr, 1, len, fp_output);
}
// void *ptr = mpp_packet_get_pos(packet);
// size_t len = mpp_packet_get_length(packet);
// fwrite(ptr, 1, len, fp_output);
mpp_packet_deinit(&packet);
}
mpp_log_f("encoded frame %d\n", frame_count);
frame_count++;
ret = mpi->enqueue(ctx, MPP_PORT_OUTPUT, task);
if (ret) {
mpp_err("mpp task output enqueue failed\n");
goto MPP_TEST_OUT;
}
break;
}
} while (1);
}
ret = mpi->reset(ctx);
@@ -239,14 +248,19 @@ MPP_TEST_OUT:
ctx = NULL;
}
if (frame) {
mpp_frame_deinit(&frame);
frame = NULL;
}
for (i = 0; i < MPI_ENC_IO_COUNT; i++) {
if (frm_buf[i]) {
mpp_buffer_put(&frm_buf[i]);
mpp_buffer_put(frm_buf[i]);
frm_buf[i] = NULL;
}
if (pkt_buf[i]) {
mpp_buffer_put(&pkt_buf[i]);
mpp_buffer_put(pkt_buf[i]);
pkt_buf[i] = NULL;
}
}