[mpp/osal]: add codec thread and hal thread

1. add MppThread / Mutex / Condition class
2. add internal flag to MppBuffer
3. change MppBufferService from structure to class
4. add mpp_buffer_put on mpp_frame_put
5. add coding type to Mpp initial function
6. mpp codec / hal thread basic flow done, but reset is not added

git-svn-id: https://10.10.10.66:8443/svn/MediaProcessPlatform/trunk/mpp@168 6e48237b-75ef-9749-8fc9-41990f28c85a
This commit is contained in:
ChenHengming
2015-08-26 12:46:38 +00:00
parent 174c68fb5e
commit 3612ed5876
9 changed files with 368 additions and 209 deletions

View File

@@ -184,7 +184,7 @@ MPP_RET mpp_init(MppCtx *ctx, MppApi **mpi, MppCtxType type, MppCodingType codin
memset(p, 0, sizeof(*p)); memset(p, 0, sizeof(*p));
p->ctx = new Mpp(type); p->ctx = new Mpp(type, coding);
if (NULL == p->ctx) { if (NULL == p->ctx) {
mpp_free(p); mpp_free(p);
mpp_err("mpp_init failed to new Mpp\n"); mpp_err("mpp_init failed to new Mpp\n");

View File

@@ -22,24 +22,34 @@
#include "mpp.h" #include "mpp.h"
#include "mpp_frame_impl.h" #include "mpp_frame_impl.h"
#include "mpp_buffer.h"
#include "mpp_packet.h" #include "mpp_packet.h"
#include "mpp_packet_impl.h" #include "mpp_packet_impl.h"
static void *thread_hal(void *data) #define MPP_TEST_FRAME_SIZE SZ_1M
void *thread_hal(void *data)
{ {
Mpp *mpp = (Mpp*)data; Mpp *mpp = (Mpp*)data;
MppThread *thd_dec = mpp->thd_codec; MppThread *codec = mpp->mTheadCodec;
MppThread *thd_hal = mpp->thd_hal; MppThread *hal = mpp->mThreadHal;
mpp_list *packets = mpp->packets; mpp_list *frames = mpp->mFrames;
mpp_list *frames = mpp->frames; mpp_list *tasks = mpp->mTasks;
while (MPP_THREAD_RUNNING == thd_hal->get_status()) { while (MPP_THREAD_RUNNING == hal->get_status()) {
/* /*
* hal thread wait for dxva interface intput firt * hal thread wait for dxva interface intput firt
*/ */
hal->lock();
if (0 == tasks->list_size())
hal->wait();
hal->unlock();
// get_config // get_config
// register genertation // register genertation
if (tasks->list_size()) {
MppHalDecTask *task;
mpp->mTasks->del_at_head(&task, sizeof(task));
mpp->mTaskGetCount++;
/* /*
* wait previous register set done * wait previous register set done
@@ -58,7 +68,16 @@ static void *thread_hal(void *data)
// mark frame in output queue // mark frame in output queue
// wait up output thread to get a output frame // wait up output thread to get a output frame
msleep(10); // for test
MppBuffer buffer;
mpp_buffer_get(mpp->mFrameGroup, &buffer, MPP_TEST_FRAME_SIZE);
MppFrame frame;
mpp_frame_init(&frame);
mpp_frame_set_buffer(frame, buffer);
frames->add_at_tail(&frame, sizeof(frame));
mpp->mFramePutCount++;
}
} }
return NULL; return NULL;
@@ -67,20 +86,34 @@ static void *thread_hal(void *data)
static void *thread_dec(void *data) static void *thread_dec(void *data)
{ {
Mpp *mpp = (Mpp*)data; Mpp *mpp = (Mpp*)data;
MppThread *thd_dec = mpp->thd_codec; MppThread *dec = mpp->mTheadCodec;
MppThread *thd_hal = mpp->thd_hal; MppThread *hal = mpp->mThreadHal;
mpp_list *packets = mpp->packets; mpp_list *packets = mpp->mPackets;
mpp_list *frames = mpp->frames; mpp_list *frames = mpp->mFrames;
MppPacketImpl packet; MppPacketImpl packet;
MppFrame frame;
while (MPP_THREAD_RUNNING == thd_dec->get_status()) { while (MPP_THREAD_RUNNING == dec->get_status()) {
RK_U32 packet_ready = 0;
/*
* wait for stream input
*/
dec->lock();
if (0 == packets->list_size())
dec->wait();
dec->unlock();
if (packets->list_size()) { if (packets->list_size()) {
mpp->mPacketLock.lock();
/* /*
* packet will be destroyed outside, here just copy the content * packet will be destroyed outside, here just copy the content
*/ */
packets->del_at_head(&packet, sizeof(packet)); packets->del_at_head(&packet, sizeof(packet));
mpp->mPacketGetCount++;
packet_ready = 1;
mpp->mPacketLock.unlock();
}
if (packet_ready) {
/* /*
* 1. send packet data to parser * 1. send packet data to parser
* *
@@ -106,7 +139,9 @@ static void *thread_dec(void *data)
* frame to hal loop. * frame to hal loop.
*/ */
// mpp->get_buffer //MppBuffer buffer;
//mpp_buffer_get(mpp->mFrameGroup, &buffer, MPP_TEST_FRAME_SIZE);
/* /*
* 3. send dxva output information and buffer information to hal thread * 3. send dxva output information and buffer information to hal thread
@@ -115,11 +150,9 @@ static void *thread_dec(void *data)
// hal->wait_prev_done; // hal->wait_prev_done;
// hal->send_config; // hal->send_config;
mpp->mTasks->add_at_tail(&mpp->mTask[0], sizeof(mpp->mTask[0]));
mpp->mTaskPutCount++;
// for test hal->signal();
mpp_frame_init(&frame);
frames->add_at_tail(&frame, sizeof(frame));
} }
} }
@@ -129,14 +162,16 @@ static void *thread_dec(void *data)
static void *thread_enc(void *data) static void *thread_enc(void *data)
{ {
Mpp *mpp = (Mpp*)data; Mpp *mpp = (Mpp*)data;
mpp_list *packets = mpp->packets; MppThread *thd_enc = mpp->mTheadCodec;
mpp_list *frames = mpp->frames; MppThread *thd_hal = mpp->mThreadHal;
mpp_list *packets = mpp->mPackets;
mpp_list *frames = mpp->mFrames;
MppFrameImpl frame; MppFrameImpl frame;
MppPacket packet; MppPacket packet;
size_t size = SZ_1M; size_t size = SZ_1M;
char *buf = mpp_malloc(char, size); char *buf = mpp_malloc(char, size);
while (mpp->thread_codec_running) { while (MPP_THREAD_RUNNING == thd_enc->get_status()) {
if (frames->list_size()) { if (frames->list_size()) {
frames->del_at_head(&frame, sizeof(frame)); frames->del_at_head(&frame, sizeof(frame));
@@ -148,102 +183,147 @@ static void *thread_enc(void *data)
return NULL; return NULL;
} }
Mpp::Mpp(MppCtxType type) Mpp::Mpp(MppCtxType type, MppCodingType coding)
: packets(NULL), : mPackets(NULL),
frames(NULL), mFrames(NULL),
thd_codec(NULL), mTasks(NULL),
thd_hal(NULL), mPacketPutCount(0),
thread_codec_running(0), mPacketGetCount(0),
thread_codec_reset(0), mFramePutCount(0),
status(0) mFrameGetCount(0),
mTaskPutCount(0),
mTaskGetCount(0),
mPacketGroup(NULL),
mFrameGroup(NULL),
mTheadCodec(NULL),
mThreadHal(NULL),
mType(type),
mCoding(coding),
mStatus(0),
mTask(NULL),
mTaskNum(2)
{ {
switch (type) { switch (mType) {
case MPP_CTX_DEC : { case MPP_CTX_DEC : {
packets = new mpp_list((node_destructor)NULL); mPackets = new mpp_list((node_destructor)NULL);
frames = new mpp_list((node_destructor)mpp_frame_deinit); mFrames = new mpp_list((node_destructor)mpp_frame_deinit);
thd_codec = new MppThread(thread_dec, this); mTasks = new mpp_list((node_destructor)NULL);
thd_hal = new MppThread(thread_hal, this); mTheadCodec = new MppThread(thread_dec, this);
mThreadHal = new MppThread(thread_hal, this);
mTask = mpp_malloc(MppHalDecTask*, mTaskNum);
mpp_buffer_group_normal_get(&mPacketGroup, MPP_BUFFER_TYPE_NORMAL);
mpp_buffer_group_limited_get(&mFrameGroup, MPP_BUFFER_TYPE_ION);
mpp_buffer_group_limit_config(mFrameGroup, 4, MPP_TEST_FRAME_SIZE);
} break; } break;
case MPP_CTX_ENC : { case MPP_CTX_ENC : {
frames = new mpp_list((node_destructor)NULL); mFrames = new mpp_list((node_destructor)NULL);
packets = new mpp_list((node_destructor)mpp_packet_deinit); mPackets = new mpp_list((node_destructor)mpp_packet_deinit);
thd_codec = new MppThread(thread_enc, this); mTasks = new mpp_list((node_destructor)NULL);
thd_hal = new MppThread(thread_hal, this); mTheadCodec = new MppThread(thread_enc, this);
mThreadHal = new MppThread(thread_hal, this);
mTask = mpp_malloc(MppHalDecTask*, mTaskNum);
mpp_buffer_group_normal_get(&mPacketGroup, MPP_BUFFER_TYPE_NORMAL);
mpp_buffer_group_limited_get(&mFrameGroup, MPP_BUFFER_TYPE_ION);
} break; } break;
default : { default : {
mpp_err("Mpp error type %d\n", type); mpp_err("Mpp error type %d\n", mType);
} break; } break;
} }
if (packets && frames && thd_codec && thd_hal) { if (mFrames && mPackets && mTask &&
thd_codec->start(); mTheadCodec && mThreadHal &&
thd_hal->start(); mPacketGroup && mFrameGroup) {
} else { mTheadCodec->start();
if (thd_codec) mThreadHal->start();
thd_codec->stop(); } else
if (thd_hal) clear();
thd_hal->stop();
if (thd_codec) {
delete thd_codec;
thd_codec = NULL;
}
if (thd_hal) {
delete thd_hal;
thd_hal = NULL;
}
if (packets) {
delete packets;
packets = NULL;
}
if (frames) {
delete frames;
frames = NULL;
}
}
} }
Mpp::~Mpp () Mpp::~Mpp ()
{ {
if (thd_codec) clear();
thd_codec->stop(); }
if (thd_hal)
thd_hal->stop();
if (thd_codec) void Mpp::clear()
delete thd_codec; {
if (thd_hal) if (mTheadCodec)
delete thd_hal; mTheadCodec->stop();
if (packets) if (mThreadHal)
delete packets; mThreadHal->stop();
if (frames)
delete frames; if (mTheadCodec) {
delete mTheadCodec;
mTheadCodec = NULL;
}
if (mThreadHal) {
delete mThreadHal;
mThreadHal = NULL;
}
if (mPackets) {
delete mPackets;
mPackets = NULL;
}
if (mFrames) {
delete mFrames;
mFrames = NULL;
}
if (mTasks) {
delete mTasks;
mTasks = NULL;
}
if (mPacketGroup) {
mpp_buffer_group_put(mPacketGroup);
mPacketGroup = NULL;
}
if (mFrameGroup) {
mpp_buffer_group_put(mFrameGroup);
mFrameGroup = NULL;
}
if (mTask)
mpp_free(mTask);
} }
MPP_RET Mpp::put_packet(MppPacket packet) MPP_RET Mpp::put_packet(MppPacket packet)
{ {
// TODO: packet data need preprocess or can be write to hardware buffer Mutex::Autolock autoLock(&mPacketLock);
return (MPP_RET)packets->add_at_tail(packet, sizeof(MppPacketImpl)); if (mPackets->list_size() < 4) {
mPackets->add_at_tail(packet, sizeof(MppPacketImpl));
mPacketPutCount++;
mTheadCodec->signal();
return MPP_OK;
}
return MPP_NOK;
} }
MPP_RET Mpp::get_frame(MppFrame *frame) MPP_RET Mpp::get_frame(MppFrame *frame)
{ {
if (frames->list_size()) { Mutex::Autolock autoLock(&mFrameLock);
frames->del_at_tail(frame, sizeof(frame)); if (mFrames->list_size()) {
mFrames->del_at_tail(frame, sizeof(frame));
mFrameGetCount++;
} }
mThreadHal->signal();
return MPP_OK; return MPP_OK;
} }
MPP_RET Mpp::put_frame(MppFrame frame) MPP_RET Mpp::put_frame(MppFrame frame)
{ {
MPP_RET ret = (MPP_RET)frames->add_at_tail(frame, sizeof(MppFrameImpl)); Mutex::Autolock autoLock(&mFrameLock);
return ret; if (mFrames->list_size() < 4) {
mFrames->add_at_tail(frame, sizeof(MppFrameImpl));
mTheadCodec->signal();
mFramePutCount++;
return MPP_OK;
}
return MPP_NOK;
} }
MPP_RET Mpp::get_packet(MppPacket *packet) MPP_RET Mpp::get_packet(MppPacket *packet)
{ {
if (packets->list_size()) { Mutex::Autolock autoLock(&mPacketLock);
packets->del_at_tail(packet, sizeof(packet)); if (mPackets->list_size()) {
mPackets->del_at_tail(packet, sizeof(packet));
mPacketGetCount++;
} }
return MPP_OK; return MPP_OK;
} }

View File

@@ -20,30 +20,52 @@
#include "rk_mpi.h" #include "rk_mpi.h"
#include "mpp_list.h" #include "mpp_list.h"
#include "mpp_thread.h" #include "mpp_thread.h"
#include "mpp_hal.h"
class Mpp { class Mpp {
public: public:
Mpp(MppCtxType type); Mpp(MppCtxType type, MppCodingType coding);
~Mpp(); ~Mpp();
mpp_list *packets;
mpp_list *frames;
MppThread *thd_codec;
MppThread *thd_hal;
RK_S32 thread_codec_running;
RK_S32 thread_codec_reset;
RK_U32 status;
MPP_RET put_packet(MppPacket packet); MPP_RET put_packet(MppPacket packet);
MPP_RET get_frame(MppFrame *frame); MPP_RET get_frame(MppFrame *frame);
MPP_RET put_frame(MppFrame frame); MPP_RET put_frame(MppFrame frame);
MPP_RET get_packet(MppPacket *packet); MPP_RET get_packet(MppPacket *packet);
Mutex mPacketLock;
Mutex mFrameLock;
Mutex mTaskLock;
mpp_list *mPackets;
mpp_list *mFrames;
mpp_list *mTasks;
RK_U32 mPacketPutCount;
RK_U32 mPacketGetCount;
RK_U32 mFramePutCount;
RK_U32 mFrameGetCount;
RK_U32 mTaskPutCount;
RK_U32 mTaskGetCount;
MppBufferGroup mPacketGroup;
MppBufferGroup mFrameGroup;
MppThread *mTheadCodec;
MppThread *mThreadHal;
MppCtxType mType;
MppCodingType mCoding;
RK_U32 mStatus;
MppHalDecTask **mTask;
RK_U32 mTaskNum;
private: private:
void clear();
Mpp(); Mpp();
Mpp(const Mpp &); Mpp(const Mpp &);
Mpp &operator=(const Mpp &); Mpp &operator=(const Mpp &);

View File

@@ -22,31 +22,26 @@
#include "mpp_mem.h" #include "mpp_mem.h"
#include "mpp_buffer_impl.h" #include "mpp_buffer_impl.h"
#define MPP_BUFFER_SERVICE_LOCK() pthread_mutex_lock(&service.lock) #define SEARCH_GROUP_NORMAL(id) search_group_by_id_no_lock(&service.mListGroup, id)
#define MPP_BUFFER_SERVICE_UNLOCK() pthread_mutex_unlock(&service.lock) #define SEARCH_GROUP_ORPHAN(id) search_group_by_id_no_lock(&service.mListOrphan, id)
#define SEARCH_GROUP_NORMAL(id) search_group_by_id_no_lock(&service.list_group, id) class MppBufferService {
#define SEARCH_GROUP_ORPHAN(id) search_group_by_id_no_lock(&service.list_orphan, id) public:
MppBufferService();
~MppBufferService();
typedef struct { Mutex mLock;
pthread_mutex_t lock;
RK_U32 group_id; RK_U32 group_id;
RK_U32 group_count; RK_U32 group_count;
struct list_head list_group; struct list_head mListGroup;
// list for used buffer which do not have group // list for used buffer which do not have group
struct list_head list_orphan; struct list_head mListOrphan;
} MppBufferService;
static MppBufferService service = {
PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
0,
0,
LIST_HEAD_INIT(service.list_group),
LIST_HEAD_INIT(service.list_orphan),
}; };
static MppBufferService service;
static MppBufferGroupImpl *search_group_by_id_no_lock(struct list_head *list, RK_U32 group_id) static MppBufferGroupImpl *search_group_by_id_no_lock(struct list_head *list, RK_U32 group_id)
{ {
MppBufferGroupImpl *pos, *n; MppBufferGroupImpl *pos, *n;
@@ -75,13 +70,9 @@ MPP_RET deinit_buffer_no_lock(MppBufferImpl *buffer)
list_del_init(&buffer->list_status); list_del_init(&buffer->list_status);
MppBufferGroupImpl *group = SEARCH_GROUP_NORMAL(buffer->group_id); MppBufferGroupImpl *group = SEARCH_GROUP_NORMAL(buffer->group_id);
if (group) { if (group) {
switch (group->mode) { if (buffer->internal)
case MPP_BUFFER_MODE_NORMAL : {
group->alloc_api->free(group->allocator, &buffer->info); group->alloc_api->free(group->allocator, &buffer->info);
} break;
default : {
} break;
}
group->usage -= buffer->info.size; group->usage -= buffer->info.size;
group->count--; group->count--;
} else { } else {
@@ -134,13 +125,13 @@ static MPP_RET inc_buffer_ref_no_lock(MppBufferImpl *buffer)
MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info) MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info)
{ {
MppBufferImpl *p = mpp_malloc(MppBufferImpl, 1); MppBufferImpl *p = mpp_calloc(MppBufferImpl, 1);
if (NULL == p) { if (NULL == p) {
mpp_err("mpp_buffer_create failed to allocate context\n"); mpp_err("mpp_buffer_create failed to allocate context\n");
return MPP_ERR_MALLOC; return MPP_ERR_MALLOC;
} }
MPP_BUFFER_SERVICE_LOCK(); Mutex::Autolock auto_lock(&service.mLock);
MppBufferGroupImpl *group = SEARCH_GROUP_NORMAL(group_id); MppBufferGroupImpl *group = SEARCH_GROUP_NORMAL(group_id);
if (group) { if (group) {
@@ -149,9 +140,9 @@ MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info)
if (MPP_OK != ret) { if (MPP_OK != ret) {
mpp_err("mpp_buffer_create failed to create buffer with size %d\n", info->size); mpp_err("mpp_buffer_create failed to create buffer with size %d\n", info->size);
mpp_free(p); mpp_free(p);
MPP_BUFFER_SERVICE_UNLOCK();
return MPP_ERR_MALLOC; return MPP_ERR_MALLOC;
} }
p->internal = 1;
} }
p->info = *info; p->info = *info;
@@ -162,8 +153,6 @@ MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info)
strncpy(p->tag, tag, sizeof(p->tag)); strncpy(p->tag, tag, sizeof(p->tag));
p->group_id = group_id; p->group_id = group_id;
p->used = 0;
p->ref_count = 0;
INIT_LIST_HEAD(&p->list_status); INIT_LIST_HEAD(&p->list_status);
list_add_tail(&p->list_status, &group->list_unused); list_add_tail(&p->list_status, &group->list_unused);
group->usage += info->size; group->usage += info->size;
@@ -174,33 +163,22 @@ MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info)
p = NULL; p = NULL;
} }
MPP_BUFFER_SERVICE_UNLOCK();
return (p) ? (MPP_OK) : (MPP_NOK); return (p) ? (MPP_OK) : (MPP_NOK);
} }
MPP_RET mpp_buffer_destroy(MppBufferImpl *buffer) MPP_RET mpp_buffer_destroy(MppBufferImpl *buffer)
{ {
MPP_BUFFER_SERVICE_LOCK(); Mutex::Autolock auto_lock(&service.mLock);
deinit_buffer_no_lock(buffer); deinit_buffer_no_lock(buffer);
MPP_BUFFER_SERVICE_UNLOCK();
return MPP_OK; return MPP_OK;
} }
MPP_RET mpp_buffer_ref_inc(MppBufferImpl *buffer) MPP_RET mpp_buffer_ref_inc(MppBufferImpl *buffer)
{ {
MPP_RET ret; Mutex::Autolock auto_lock(&service.mLock);
return inc_buffer_ref_no_lock(buffer);
MPP_BUFFER_SERVICE_LOCK();
ret = inc_buffer_ref_no_lock(buffer);
MPP_BUFFER_SERVICE_UNLOCK();
return ret;
} }
@@ -211,7 +189,7 @@ MPP_RET mpp_buffer_ref_dec(MppBufferImpl *buffer)
return MPP_NOK; return MPP_NOK;
} }
MPP_BUFFER_SERVICE_LOCK(); Mutex::Autolock auto_lock(&service.mLock);
buffer->ref_count--; buffer->ref_count--;
if (0 == buffer->ref_count) { if (0 == buffer->ref_count) {
@@ -224,8 +202,6 @@ MPP_RET mpp_buffer_ref_dec(MppBufferImpl *buffer)
} }
} }
MPP_BUFFER_SERVICE_UNLOCK();
return MPP_OK; return MPP_OK;
} }
@@ -233,7 +209,7 @@ MppBufferImpl *mpp_buffer_get_unused(MppBufferGroupImpl *p, size_t size)
{ {
MppBufferImpl *buffer = NULL; MppBufferImpl *buffer = NULL;
MPP_BUFFER_SERVICE_LOCK(); Mutex::Autolock auto_lock(&service.mLock);
if (!list_empty(&p->list_unused)) { if (!list_empty(&p->list_unused)) {
MppBufferImpl *pos, *n; MppBufferImpl *pos, *n;
@@ -246,8 +222,6 @@ MppBufferImpl *mpp_buffer_get_unused(MppBufferGroupImpl *p, size_t size)
} }
} }
MPP_BUFFER_SERVICE_UNLOCK();
return buffer; return buffer;
} }
@@ -260,13 +234,13 @@ MPP_RET mpp_buffer_group_init(MppBufferGroupImpl **group, const char *tag, MppBu
return MPP_ERR_MALLOC; return MPP_ERR_MALLOC;
} }
MPP_BUFFER_SERVICE_LOCK(); Mutex::Autolock auto_lock(&service.mLock);
INIT_LIST_HEAD(&p->list_group); INIT_LIST_HEAD(&p->list_group);
INIT_LIST_HEAD(&p->list_used); INIT_LIST_HEAD(&p->list_used);
INIT_LIST_HEAD(&p->list_unused); INIT_LIST_HEAD(&p->list_unused);
list_add_tail(&p->list_group, &service.list_group); list_add_tail(&p->list_group, &service.mListGroup);
snprintf(p->tag, sizeof(p->tag), "%s_%d", tag, service.group_id); snprintf(p->tag, sizeof(p->tag), "%s_%d", tag, service.group_id);
p->mode = mode; p->mode = mode;
@@ -289,7 +263,6 @@ MPP_RET mpp_buffer_group_init(MppBufferGroupImpl **group, const char *tag, MppBu
mpp_alloctor_get(&p->allocator, &p->alloc_api, type); mpp_alloctor_get(&p->allocator, &p->alloc_api, type);
MPP_BUFFER_SERVICE_UNLOCK();
*group = p; *group = p;
return MPP_OK; return MPP_OK;
@@ -302,7 +275,7 @@ MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p)
return MPP_ERR_NULL_PTR; return MPP_ERR_NULL_PTR;
} }
MPP_BUFFER_SERVICE_LOCK(); Mutex::Autolock auto_lock(&service.mLock);
// remove unused list // remove unused list
if (!list_empty(&p->list_unused)) { if (!list_empty(&p->list_unused)) {
@@ -317,7 +290,7 @@ MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p)
} else { } else {
// otherwise move the group to list_orphan and wait for buffer release // otherwise move the group to list_orphan and wait for buffer release
list_del_init(&p->list_group); list_del_init(&p->list_group);
list_add_tail(&p->list_group, &service.list_orphan); list_add_tail(&p->list_group, &service.mListOrphan);
mpp_err("mpp_group %p deinit with %d buffer not released\n", p, p->usage); mpp_err("mpp_group %p deinit with %d buffer not released\n", p, p->usage);
// if any buffer with mode MPP_BUFFER_MODE_COMMIT found it should be error // if any buffer with mode MPP_BUFFER_MODE_COMMIT found it should be error
MppBufferImpl *pos, *n; MppBufferImpl *pos, *n;
@@ -326,8 +299,34 @@ MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p)
} }
} }
MPP_BUFFER_SERVICE_UNLOCK();
return MPP_OK; return MPP_OK;
} }
MppBufferService::MppBufferService()
: group_id(0),
group_count(0)
{
INIT_LIST_HEAD(&mListGroup);
INIT_LIST_HEAD(&mListOrphan);
}
MppBufferService::~MppBufferService()
{
// remove all group first
if (!list_empty(&mListGroup)) {
MppBufferGroupImpl *pos, *n;
list_for_each_entry_safe(pos, n, &mListGroup, MppBufferGroupImpl, list_group) {
mpp_buffer_group_deinit(pos);
}
}
// remove all orphan buffer
if (!list_empty(&mListOrphan)) {
MppBufferImpl *pos, *n;
list_for_each_entry_safe(pos, n, &mListOrphan, MppBufferImpl, list_status) {
deinit_buffer_no_lock(pos);
}
}
}

View File

@@ -43,6 +43,7 @@ struct MppBufferImpl_t {
// used flag is for used/unused list detection // used flag is for used/unused list detection
RK_U32 used; RK_U32 used;
RK_U32 internal;
RK_S32 ref_count; RK_S32 ref_count;
struct list_head list_status; struct list_head list_status;
}; };

View File

@@ -59,6 +59,10 @@ MPP_RET mpp_frame_deinit(MppFrame *frame)
return MPP_ERR_NULL_PTR; return MPP_ERR_NULL_PTR;
} }
MppBuffer buffer = mpp_frame_get_buffer(*frame);
if (buffer)
mpp_buffer_put(buffer);
mpp_free(*frame); mpp_free(*frame);
*frame = NULL; *frame = NULL;
return MPP_OK; return MPP_OK;

View File

@@ -56,25 +56,107 @@ typedef enum {
} MppThreadStatus; } MppThreadStatus;
#ifdef __cplusplus #ifdef __cplusplus
class Mutex;
class Condition;
/*
* for shorter type name and function name
*/
class Mutex {
public:
Mutex();
~Mutex();
void lock();
void unlock();
class Autolock {
public:
inline Autolock(Mutex& mutex) : mLock(mutex) { mLock.lock(); }
inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); }
inline ~Autolock() { mLock.unlock(); }
private:
Mutex& mLock;
};
private:
friend class Condition;
pthread_mutex_t mMutex;
Mutex(const Mutex &);
Mutex &operator = (const Mutex&);
};
inline Mutex::Mutex() {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mMutex, &attr);
pthread_mutexattr_destroy(&attr);
}
inline Mutex::~Mutex() {
pthread_mutex_destroy(&mMutex);
}
inline void Mutex::lock() {
pthread_mutex_lock(&mMutex);
}
inline void Mutex::unlock() {
pthread_mutex_unlock(&mMutex);
}
typedef Mutex::Autolock AutoMutex;
/*
* for shorter type name and function name
*/
class Condition {
public:
Condition();
Condition(int type);
~Condition();
void wait(Mutex& mutex);
void signal();
private:
pthread_cond_t mCond;
};
inline Condition::Condition() {
pthread_cond_init(&mCond, NULL);
}
inline Condition::~Condition() {
pthread_cond_destroy(&mCond);
}
inline void Condition::wait(Mutex& mutex) {
pthread_cond_wait(&mCond, &mutex.mMutex);
}
inline void Condition::signal() {
pthread_cond_signal(&mCond);
}
class MppThread { class MppThread {
public: public:
MppThread(MppThreadFunc func, void *ctx); MppThread(MppThreadFunc func, void *ctx);
~MppThread(); ~MppThread() {};
MppThreadStatus get_status(); MppThreadStatus get_status();
void set_status(MppThreadStatus status); void set_status(MppThreadStatus status);
void start(); void start();
void stop(); void stop();
void lock(); void lock() { mLock.lock(); }
void unlock(); void unlock() { mLock.unlock(); }
void wait(); void wait() { mCondition.wait(mLock); }
void signal(); void signal() { mCondition.signal(); }
private: private:
Mutex mLock;
Condition mCondition;
pthread_t mThread; pthread_t mThread;
pthread_mutex_t mLock;
pthread_cond_t mCondition;
MppThreadStatus mStatus; MppThreadStatus mStatus;
MppThreadFunc mFunction; MppThreadFunc mFunction;
@@ -84,6 +166,7 @@ private:
MppThread(const MppThread &); MppThread(const MppThread &);
MppThread &operator=(const MppThread &); MppThread &operator=(const MppThread &);
}; };
#endif #endif
#endif /*__MPP_THREAD_H__*/ #endif /*__MPP_THREAD_H__*/

View File

@@ -30,19 +30,6 @@ MppThread::MppThread(MppThreadFunc func, void *ctx)
mFunction(func), mFunction(func),
mContext(ctx) mContext(ctx)
{ {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mLock, &attr);
pthread_mutexattr_destroy(&attr);
pthread_cond_init(&mCondition, NULL);
}
MppThread::~MppThread()
{
pthread_cond_destroy(&mCondition);
pthread_mutex_destroy(&mLock);
} }
MppThreadStatus MppThread::get_status() MppThreadStatus MppThread::get_status()
@@ -75,6 +62,8 @@ void MppThread::start()
void MppThread::stop() void MppThread::stop()
{ {
mStatus = MPP_THREAD_STOPPING; mStatus = MPP_THREAD_STOPPING;
signal();
void *dummy; void *dummy;
pthread_join(mThread, &dummy); pthread_join(mThread, &dummy);
thread_dbg(MPP_THREAD_DBG_FUNCTION, "mThread %p mContext %p destroy success\n", thread_dbg(MPP_THREAD_DBG_FUNCTION, "mThread %p mContext %p destroy success\n",
@@ -83,23 +72,3 @@ void MppThread::stop()
mStatus = MPP_THREAD_UNINITED; mStatus = MPP_THREAD_UNINITED;
} }
void MppThread::lock()
{
pthread_mutex_lock(&mLock);
}
void MppThread::unlock()
{
pthread_mutex_unlock(&mLock);
}
void MppThread::wait()
{
pthread_cond_wait(&mCondition, &mLock);
}
void MppThread::signal()
{
pthread_cond_signal(&mCondition);
}

View File

@@ -118,7 +118,7 @@ int main()
mpp_packet_deinit(&dec_in); mpp_packet_deinit(&dec_in);
} }
for (i = 0; i < MPI_DEC_LOOP_COUNT; i++) { for (i = 0; i < MPI_DEC_LOOP_COUNT; ) {
ret = mpi->decode_get_frame(ctx, &dec_out); ret = mpi->decode_get_frame(ctx, &dec_out);
if (MPP_OK != ret) { if (MPP_OK != ret) {
goto MPP_TEST_FAILED; goto MPP_TEST_FAILED;
@@ -134,6 +134,7 @@ int main()
mpp_frame_deinit(&dec_out); mpp_frame_deinit(&dec_out);
dec_out = next; dec_out = next;
} while (dec_out); } while (dec_out);
i++;
} }
} }