From ef3a1e5c037d56c603a56e0696edb3a1a158124c Mon Sep 17 00:00:00 2001 From: ChenHengming Date: Fri, 15 Jul 2016 02:57:03 +0000 Subject: [PATCH] [mpp_task]: add mpp_task.cpp and mpp_task_impl.cpp git-svn-id: https://10.10.10.66:8443/svn/MediaProcessPlatform/trunk/mpp@1029 6e48237b-75ef-9749-8fc9-41990f28c85a --- mpp/base/inc/mpp_task_impl.h | 100 +++++++++++++++ mpp/base/mpp_task.cpp | 149 ++++++++++++++++++++++ mpp/base/mpp_task_impl.cpp | 241 +++++++++++++++++++++++++++++++++++ 3 files changed, 490 insertions(+) create mode 100644 mpp/base/inc/mpp_task_impl.h create mode 100644 mpp/base/mpp_task.cpp create mode 100644 mpp/base/mpp_task_impl.cpp diff --git a/mpp/base/inc/mpp_task_impl.h b/mpp/base/inc/mpp_task_impl.h new file mode 100644 index 00000000..fc816b6f --- /dev/null +++ b/mpp/base/inc/mpp_task_impl.h @@ -0,0 +1,100 @@ +/* + * Copyright 2015 Rockchip Electronics Co. LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MPP_TASK_IMPL_H__ +#define __MPP_TASK_IMPL_H__ + +#include "mpp_list.h" +#include "mpp_meta.h" + +/* + * mpp task status transaction + * + * mpp task is generated from mpp port. When create a mpp port the corresponding task + * will be created, too. Then external user will dequeue task from port and enqueue to + * mpp port for process. + * + * input port task work flow: + * + * description | call | status transaction + * 1. input port init | enqueue(external) | -> external_queue + * 2. input port user dequeue | dequeue(external) | external_queue -> external_hold + * 3. user setup task for processing| | + * 4. input port user enqueue | enqueue(external) | external_hold -> internal_queue + * 5. input port mpp start process | dequeue(internal) | internal_queue -> internal_hold + * 6. mpp process task | | + * 7. input port mpp task finish | enqueue(internal) | internal_hold -> external_queue + * loop to 2 + * + * output port task work flow: + * description | call | status transaction + * 1. output port init | enqueue(internal) | -> internal_queue + * 2. output port mpp task dequeue | dequeue(internal) | internal_queue -> internal_hold + * 3. mpp setup task by processed frame/packet | + * 4. output port mpp task enqueue | enqueue(internal) | internal_hold -> external_queue + * 5. output port user task dequeue | dequeue(external) | external_queue -> external_hold + * 6. user get task as output | | + * 7. output port user release task | enqueue(external) | external_hold -> external_queue + * loop to 2 + * + */ +typedef enum MppTaskStatus_e { + MPP_EXTERNAL_QUEUE, /* in external queue and ready for external dequeue */ + MPP_EXTERNAL_HOLD, /* dequeued and hold by external user, user will config */ + MPP_INTERNAL_QUEUE, /* in mpp internal work queue and ready for mpp dequeue */ + MPP_INTERNAL_HOLD, /* dequeued and hold by mpp internal worker, mpp is processing */ + MPP_TASK_BUTT, +} MppTaskStatus; + +typedef void* MppPort; + +typedef struct MppTaskImpl_t { + const char *name; + struct list_head list; + MppPort *port; + RK_S32 index; + MppTaskStatus status; + + MppMeta meta; +} MppTaskImpl; + +#ifdef __cplusplus +extern "C" { +#endif + +MPP_RET check_mpp_task_name(MppTask task); + +/* + * mpp_port_init: + * initialize port with task count and initial status + * group - return port pointer + * type - initial queue for all tasks + * task_count - total task count for this task group + */ +MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count); +MPP_RET mpp_port_deinit(MppPort port); + +MPP_RET mpp_port_can_dequeue(MppPort port); +MPP_RET mpp_port_can_enqueue(MppPort port); + +MPP_RET mpp_port_dequeue(MppPort port, MppTask *task); +MPP_RET mpp_port_enqueue(MppPort port, MppTask task); + +#ifdef __cplusplus +} +#endif + +#endif /*__MPP_TASK_IMPL_H__*/ diff --git a/mpp/base/mpp_task.cpp b/mpp/base/mpp_task.cpp new file mode 100644 index 00000000..b38d91c2 --- /dev/null +++ b/mpp/base/mpp_task.cpp @@ -0,0 +1,149 @@ +/* + * Copyright 2015 Rockchip Electronics Co. LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define MODULE_TAG "mpp_task" + +#include + +#include "mpp_task.h" +#include "mpp_task_impl.h" + +MPP_RET mpp_task_meta_set_s32(MppTask task, MppMetaKey key, RK_S32 val) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + return mpp_meta_set_s32(impl->meta, key, val); +} + +MPP_RET mpp_task_meta_set_s64(MppTask task, MppMetaKey key, RK_S64 val) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + return mpp_meta_set_s64(impl->meta, key, val); +} + +MPP_RET mpp_task_meta_set_ptr(MppTask task, MppMetaKey key, void *val) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + return mpp_meta_set_ptr(impl->meta, key, val); +} + +MPP_RET mpp_task_meta_set_frame(MppTask task, MppMetaKey key, MppFrame frame) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + return mpp_meta_set_frame(impl->meta, key, frame); +} + +MPP_RET mpp_task_meta_set_packet(MppTask task, MppMetaKey key, MppPacket packet) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + return mpp_meta_set_packet(impl->meta, key, packet); +} + +MPP_RET mpp_task_meta_set_buffer(MppTask task, MppMetaKey key, MppBuffer buffer) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + return mpp_meta_set_buffer(impl->meta, key, buffer); +} + +MPP_RET mpp_task_meta_get_s32(MppTask task, MppMetaKey key, RK_S32 *val, RK_S32 default_val) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + MPP_RET ret = mpp_meta_get_s32(impl->meta, key, val); + if (ret) + *val = default_val; + return ret; +} + +MPP_RET mpp_task_meta_get_s64(MppTask task, MppMetaKey key, RK_S64 *val, RK_S64 default_val) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + MPP_RET ret = mpp_meta_get_s64(impl->meta, key, val); + if (ret) + *val = default_val; + return ret; +} + +MPP_RET mpp_task_meta_get_ptr(MppTask task, MppMetaKey key, void **val, void *default_val) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + MPP_RET ret = mpp_meta_get_ptr(impl->meta, key, val); + if (ret) + *val = default_val; + return ret; +} + +MPP_RET mpp_task_meta_get_frame(MppTask task, MppMetaKey key, MppFrame *frame) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + MPP_RET ret = mpp_meta_get_frame(impl->meta, key, frame); + if (ret) + *frame = NULL; + return ret; +} + +MPP_RET mpp_task_meta_get_packet(MppTask task, MppMetaKey key, MppPacket *packet) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + MPP_RET ret = mpp_meta_get_packet(impl->meta, key, packet); + if (ret) + *packet = NULL; + return ret; +} + +MPP_RET mpp_task_meta_get_buffer(MppTask task, MppMetaKey key, MppBuffer *buffer) +{ + if (check_mpp_task_name(task)) + return MPP_NOK; + + MppTaskImpl *impl = (MppTaskImpl *)task; + MPP_RET ret = mpp_meta_get_buffer(impl->meta, key, buffer); + if (ret) + *buffer = NULL; + return ret; +} + diff --git a/mpp/base/mpp_task_impl.cpp b/mpp/base/mpp_task_impl.cpp new file mode 100644 index 00000000..65c03ae8 --- /dev/null +++ b/mpp/base/mpp_task_impl.cpp @@ -0,0 +1,241 @@ +/* + * Copyright 2015 Rockchip Electronics Co. LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define MODULE_TAG "mpp_task_impl" + +#include + +#include "mpp_log.h" +#include "mpp_mem.h" +#include "mpp_task_impl.h" + +#define MAX_TASK_COUNT 8 + +typedef struct MppTaskGroupImpl_t { + MppPortType type; + RK_S32 task_count; + Mutex *lock; + + MppTaskImpl *tasks; + + struct list_head list[MPP_TASK_BUTT]; + RK_S32 count[MPP_TASK_BUTT]; +} MppPortImpl; + +static const char *module_name = MODULE_TAG; + +void setup_mpp_task_name(MppTaskImpl *task) +{ + task->name = module_name; +} + +MPP_RET check_mpp_task_name(MppTask task) +{ + if (task && ((MppTaskImpl *)task)->name == module_name) + return MPP_OK; + + mpp_err_f("pointer %p failed on check\n", task); + mpp_abort(); + return MPP_NOK; +} + +MPP_RET mpp_port_init(MppPort *port, MppPortType type, RK_S32 task_count) +{ + if (NULL == port || type >= MPP_PORT_BUTT || task_count > MAX_TASK_COUNT) { + mpp_err_f("invalid input port %p type %d count %d\n", port, type, task_count); + return MPP_ERR_UNKNOW; + } + + MppPortImpl *p = NULL; + MppTaskImpl *tasks = NULL; + Mutex *lock = NULL; + MppTaskStatus status = (type == MPP_PORT_INPUT) ? (MPP_EXTERNAL_HOLD) : (MPP_INTERNAL_HOLD); + + do { + p = mpp_calloc(MppPortImpl, 1); + if (NULL == p) { + mpp_err_f("malloc port failed\n"); + break; + } + lock = new Mutex(); + if (NULL == lock) { + mpp_err_f("new lock failed\n"); + break;; + } + tasks = mpp_calloc(MppTaskImpl, task_count); + if (NULL == tasks) { + mpp_err_f("malloc tasks list failed\n"); + break;; + } + + p->type = type; + p->task_count = task_count; + p->lock = lock; + p->tasks = tasks; + + for (RK_U32 i = 0; i < MPP_TASK_BUTT; i++) + INIT_LIST_HEAD(&p->list[i]); + + for (RK_S32 i = 0; i < task_count; i++) { + setup_mpp_task_name(&tasks[i]); + INIT_LIST_HEAD(&tasks[i].list); + tasks[i].index = i; + tasks[i].port = port; + tasks[i].status = status; + list_add_tail(&tasks[i].list, &p->list[status]); + p->count[status]++; + } + *port = p; + return MPP_OK; + } while (0); + + if (p) + mpp_free(p); + if (lock) + delete lock; + if (tasks) + mpp_free(tasks); + + *port = NULL; + return MPP_NOK; +} + +MPP_RET mpp_port_deinit(MppPort port) +{ + if (NULL == port) { + mpp_err_f("found NULL input port\n"); + return MPP_ERR_NULL_PTR; + } + + MppPortImpl *p = (MppPortImpl *)port; + if (p->tasks) + mpp_free(p->tasks); + if (p->lock) + delete p->lock; + mpp_free(p); + return MPP_OK; +} + +MPP_RET mpp_port_can_dequeue(MppPort port) +{ + if (NULL == port) { + mpp_err_f("invalid input port %p\n", port); + return MPP_ERR_NULL_PTR; + } + + MppPortImpl *p = (MppPortImpl *)port; + AutoMutex auto_lock(p->lock); + MppTaskStatus status_curr; + MppTaskStatus status_next; + + return MPP_OK; +} + +MPP_RET mpp_port_can_enqueue(MppPort port) +{ + return MPP_OK; +} + +static MppTaskImpl* mpp_task_get_by_status(MppPortImpl *p, MppTaskStatus status) +{ + struct list_head *list = &p->list[status]; + if (list_empty(list)) + return NULL; + + MppTaskImpl *task = list_entry(list->next, MppTaskImpl, list); + mpp_assert(task->status == status); + list_del_init(&task->list); + return task; +} + +static MPP_RET mpp_task_put_by_status(MppPortImpl *p, MppTaskStatus status, MppTaskImpl* task) +{ + task->status = status; + list_add_tail(&task->list, &p->list[status]); + return MPP_OK; +} + +MPP_RET mpp_port_dequeue(MppPort port, MppTask *task) +{ + if (NULL == port || NULL == task) { + mpp_err_f("invalid input port %p task %d\n", port, task); + return MPP_ERR_UNKNOW; + } + + MppPortImpl *p = (MppPortImpl *)port; + AutoMutex auto_lock(p->lock); + MppTaskStatus status_curr; + MppTaskStatus status_next; + + switch (p->type) { + case MPP_PORT_INPUT : { + status_curr = MPP_EXTERNAL_QUEUE; + status_next = MPP_EXTERNAL_HOLD; + } break; + case MPP_PORT_OUTPUT : { + status_curr = MPP_INTERNAL_QUEUE; + status_next = MPP_INTERNAL_HOLD; + } break; + default : { + mpp_err_f("invalid queue type: %d\n", p->type); + return MPP_NOK; + } break; + } + + MppTaskImpl *task_op = mpp_task_get_by_status(p, status_curr); + if (NULL == task_op) + return MPP_NOK; + + mpp_task_put_by_status(p, status_next, task_op); + *task = (MppTask)task_op; + return MPP_OK; +} + +MPP_RET mpp_port_enqueue(MppPort port, MppTask task) +{ + if (NULL == port || NULL == task) { + mpp_err_f("invalid input port %p task %d\n", port, task); + return MPP_ERR_UNKNOW; + } + + MppPortImpl *p = (MppPortImpl *)port; + AutoMutex auto_lock(p->lock); + MppTaskStatus status_curr; + MppTaskStatus status_next; + + switch (p->type) { + case MPP_PORT_INPUT : { + status_curr = MPP_INTERNAL_HOLD; + status_next = MPP_EXTERNAL_QUEUE; + } break; + case MPP_PORT_OUTPUT : { + status_curr = MPP_EXTERNAL_HOLD; + status_next = MPP_INTERNAL_QUEUE; + } break; + default : { + mpp_err_f("invalid queue type: %d\n", p->type); + return MPP_NOK; + } break; + } + + MppTaskImpl *task_op = (MppTaskImpl *)task; + mpp_assert(task_op->status == status_curr); + list_del_init(&task_op->list); + + mpp_task_put_by_status(p, status_next, task_op); + return MPP_OK; +} +