diff --git a/mpp/mpi.cpp b/mpp/mpi.cpp index 31ff9493..2f291ebe 100644 --- a/mpp/mpi.cpp +++ b/mpp/mpi.cpp @@ -184,7 +184,7 @@ MPP_RET mpp_init(MppCtx *ctx, MppApi **mpi, MppCtxType type, MppCodingType codin memset(p, 0, sizeof(*p)); - p->ctx = new Mpp(type); + p->ctx = new Mpp(type, coding); if (NULL == p->ctx) { mpp_free(p); mpp_err("mpp_init failed to new Mpp\n"); diff --git a/mpp/mpp.cpp b/mpp/mpp.cpp index 3c6292d6..c0e67891 100644 --- a/mpp/mpp.cpp +++ b/mpp/mpp.cpp @@ -22,43 +22,62 @@ #include "mpp.h" #include "mpp_frame_impl.h" -#include "mpp_buffer.h" #include "mpp_packet.h" #include "mpp_packet_impl.h" -static void *thread_hal(void *data) +#define MPP_TEST_FRAME_SIZE SZ_1M + +void *thread_hal(void *data) { Mpp *mpp = (Mpp*)data; - MppThread *thd_dec = mpp->thd_codec; - MppThread *thd_hal = mpp->thd_hal; - mpp_list *packets = mpp->packets; - mpp_list *frames = mpp->frames; + MppThread *codec = mpp->mTheadCodec; + MppThread *hal = mpp->mThreadHal; + mpp_list *frames = mpp->mFrames; + mpp_list *tasks = mpp->mTasks; - while (MPP_THREAD_RUNNING == thd_hal->get_status()) { + while (MPP_THREAD_RUNNING == hal->get_status()) { /* * hal thread wait for dxva interface intput firt */ + hal->lock(); + if (0 == tasks->list_size()) + hal->wait(); + hal->unlock(); + // get_config // register genertation + if (tasks->list_size()) { + MppHalDecTask *task; + mpp->mTasks->del_at_head(&task, sizeof(task)); + mpp->mTaskGetCount++; - /* - * wait previous register set done - */ - // hal->get_regs; + /* + * wait previous register set done + */ + // hal->get_regs; - /* - * send current register set to hardware - */ - // hal->put_regs; + /* + * send current register set to hardware + */ + // hal->put_regs; - /* - * mark previous buffer is complete - */ - // signal() - // mark frame in output queue - // wait up output thread to get a output frame + /* + * mark previous buffer is complete + */ + // signal() + // mark frame in output queue + // wait up output thread to get a output frame - msleep(10); + // for test + MppBuffer buffer; + mpp_buffer_get(mpp->mFrameGroup, &buffer, MPP_TEST_FRAME_SIZE); + + MppFrame frame; + mpp_frame_init(&frame); + mpp_frame_set_buffer(frame, buffer); + frames->add_at_tail(&frame, sizeof(frame)); + mpp->mFramePutCount++; + } } return NULL; @@ -67,20 +86,34 @@ static void *thread_hal(void *data) static void *thread_dec(void *data) { Mpp *mpp = (Mpp*)data; - MppThread *thd_dec = mpp->thd_codec; - MppThread *thd_hal = mpp->thd_hal; - mpp_list *packets = mpp->packets; - mpp_list *frames = mpp->frames; + MppThread *dec = mpp->mTheadCodec; + MppThread *hal = mpp->mThreadHal; + mpp_list *packets = mpp->mPackets; + mpp_list *frames = mpp->mFrames; MppPacketImpl packet; - MppFrame frame; - while (MPP_THREAD_RUNNING == thd_dec->get_status()) { + while (MPP_THREAD_RUNNING == dec->get_status()) { + RK_U32 packet_ready = 0; + /* + * wait for stream input + */ + dec->lock(); + if (0 == packets->list_size()) + dec->wait(); + dec->unlock(); + if (packets->list_size()) { + mpp->mPacketLock.lock(); /* * packet will be destroyed outside, here just copy the content */ packets->del_at_head(&packet, sizeof(packet)); + mpp->mPacketGetCount++; + packet_ready = 1; + mpp->mPacketLock.unlock(); + } + if (packet_ready) { /* * 1. send packet data to parser * @@ -106,7 +139,9 @@ static void *thread_dec(void *data) * frame to hal loop. */ - // mpp->get_buffer + //MppBuffer buffer; + //mpp_buffer_get(mpp->mFrameGroup, &buffer, MPP_TEST_FRAME_SIZE); + /* * 3. send dxva output information and buffer information to hal thread @@ -115,11 +150,9 @@ static void *thread_dec(void *data) // hal->wait_prev_done; // hal->send_config; - - - // for test - mpp_frame_init(&frame); - frames->add_at_tail(&frame, sizeof(frame)); + mpp->mTasks->add_at_tail(&mpp->mTask[0], sizeof(mpp->mTask[0])); + mpp->mTaskPutCount++; + hal->signal(); } } @@ -129,14 +162,16 @@ static void *thread_dec(void *data) static void *thread_enc(void *data) { Mpp *mpp = (Mpp*)data; - mpp_list *packets = mpp->packets; - mpp_list *frames = mpp->frames; + MppThread *thd_enc = mpp->mTheadCodec; + MppThread *thd_hal = mpp->mThreadHal; + mpp_list *packets = mpp->mPackets; + mpp_list *frames = mpp->mFrames; MppFrameImpl frame; MppPacket packet; size_t size = SZ_1M; char *buf = mpp_malloc(char, size); - while (mpp->thread_codec_running) { + while (MPP_THREAD_RUNNING == thd_enc->get_status()) { if (frames->list_size()) { frames->del_at_head(&frame, sizeof(frame)); @@ -148,102 +183,147 @@ static void *thread_enc(void *data) return NULL; } -Mpp::Mpp(MppCtxType type) - : packets(NULL), - frames(NULL), - thd_codec(NULL), - thd_hal(NULL), - thread_codec_running(0), - thread_codec_reset(0), - status(0) +Mpp::Mpp(MppCtxType type, MppCodingType coding) + : mPackets(NULL), + mFrames(NULL), + mTasks(NULL), + mPacketPutCount(0), + mPacketGetCount(0), + mFramePutCount(0), + mFrameGetCount(0), + mTaskPutCount(0), + mTaskGetCount(0), + mPacketGroup(NULL), + mFrameGroup(NULL), + mTheadCodec(NULL), + mThreadHal(NULL), + mType(type), + mCoding(coding), + mStatus(0), + mTask(NULL), + mTaskNum(2) { - switch (type) { + switch (mType) { case MPP_CTX_DEC : { - packets = new mpp_list((node_destructor)NULL); - frames = new mpp_list((node_destructor)mpp_frame_deinit); - thd_codec = new MppThread(thread_dec, this); - thd_hal = new MppThread(thread_hal, this); + mPackets = new mpp_list((node_destructor)NULL); + mFrames = new mpp_list((node_destructor)mpp_frame_deinit); + mTasks = new mpp_list((node_destructor)NULL); + mTheadCodec = new MppThread(thread_dec, this); + mThreadHal = new MppThread(thread_hal, this); + mTask = mpp_malloc(MppHalDecTask*, mTaskNum); + mpp_buffer_group_normal_get(&mPacketGroup, MPP_BUFFER_TYPE_NORMAL); + mpp_buffer_group_limited_get(&mFrameGroup, MPP_BUFFER_TYPE_ION); + mpp_buffer_group_limit_config(mFrameGroup, 4, MPP_TEST_FRAME_SIZE); } break; case MPP_CTX_ENC : { - frames = new mpp_list((node_destructor)NULL); - packets = new mpp_list((node_destructor)mpp_packet_deinit); - thd_codec = new MppThread(thread_enc, this); - thd_hal = new MppThread(thread_hal, this); + mFrames = new mpp_list((node_destructor)NULL); + mPackets = new mpp_list((node_destructor)mpp_packet_deinit); + mTasks = new mpp_list((node_destructor)NULL); + mTheadCodec = new MppThread(thread_enc, this); + mThreadHal = new MppThread(thread_hal, this); + mTask = mpp_malloc(MppHalDecTask*, mTaskNum); + mpp_buffer_group_normal_get(&mPacketGroup, MPP_BUFFER_TYPE_NORMAL); + mpp_buffer_group_limited_get(&mFrameGroup, MPP_BUFFER_TYPE_ION); } break; default : { - mpp_err("Mpp error type %d\n", type); + mpp_err("Mpp error type %d\n", mType); } break; } - if (packets && frames && thd_codec && thd_hal) { - thd_codec->start(); - thd_hal->start(); - } else { - if (thd_codec) - thd_codec->stop(); - if (thd_hal) - thd_hal->stop(); - - if (thd_codec) { - delete thd_codec; - thd_codec = NULL; - } - if (thd_hal) { - delete thd_hal; - thd_hal = NULL; - } - if (packets) { - delete packets; - packets = NULL; - } - if (frames) { - delete frames; - frames = NULL; - } - } + if (mFrames && mPackets && mTask && + mTheadCodec && mThreadHal && + mPacketGroup && mFrameGroup) { + mTheadCodec->start(); + mThreadHal->start(); + } else + clear(); } Mpp::~Mpp () { - if (thd_codec) - thd_codec->stop(); - if (thd_hal) - thd_hal->stop(); + clear(); +} - if (thd_codec) - delete thd_codec; - if (thd_hal) - delete thd_hal; - if (packets) - delete packets; - if (frames) - delete frames; +void Mpp::clear() +{ + if (mTheadCodec) + mTheadCodec->stop(); + if (mThreadHal) + mThreadHal->stop(); + + if (mTheadCodec) { + delete mTheadCodec; + mTheadCodec = NULL; + } + if (mThreadHal) { + delete mThreadHal; + mThreadHal = NULL; + } + if (mPackets) { + delete mPackets; + mPackets = NULL; + } + if (mFrames) { + delete mFrames; + mFrames = NULL; + } + if (mTasks) { + delete mTasks; + mTasks = NULL; + } + if (mPacketGroup) { + mpp_buffer_group_put(mPacketGroup); + mPacketGroup = NULL; + } + if (mFrameGroup) { + mpp_buffer_group_put(mFrameGroup); + mFrameGroup = NULL; + } + if (mTask) + mpp_free(mTask); } MPP_RET Mpp::put_packet(MppPacket packet) { - // TODO: packet data need preprocess or can be write to hardware buffer - return (MPP_RET)packets->add_at_tail(packet, sizeof(MppPacketImpl)); + Mutex::Autolock autoLock(&mPacketLock); + if (mPackets->list_size() < 4) { + mPackets->add_at_tail(packet, sizeof(MppPacketImpl)); + mPacketPutCount++; + mTheadCodec->signal(); + return MPP_OK; + } + return MPP_NOK; } MPP_RET Mpp::get_frame(MppFrame *frame) { - if (frames->list_size()) { - frames->del_at_tail(frame, sizeof(frame)); + Mutex::Autolock autoLock(&mFrameLock); + if (mFrames->list_size()) { + mFrames->del_at_tail(frame, sizeof(frame)); + mFrameGetCount++; } + mThreadHal->signal(); return MPP_OK; } MPP_RET Mpp::put_frame(MppFrame frame) { - MPP_RET ret = (MPP_RET)frames->add_at_tail(frame, sizeof(MppFrameImpl)); - return ret; + Mutex::Autolock autoLock(&mFrameLock); + if (mFrames->list_size() < 4) { + mFrames->add_at_tail(frame, sizeof(MppFrameImpl)); + mTheadCodec->signal(); + mFramePutCount++; + return MPP_OK; + } + return MPP_NOK; } MPP_RET Mpp::get_packet(MppPacket *packet) { - if (packets->list_size()) { - packets->del_at_tail(packet, sizeof(packet)); + Mutex::Autolock autoLock(&mPacketLock); + if (mPackets->list_size()) { + mPackets->del_at_tail(packet, sizeof(packet)); + mPacketGetCount++; } return MPP_OK; } diff --git a/mpp/mpp.h b/mpp/mpp.h index 0313a6b0..4661ee04 100644 --- a/mpp/mpp.h +++ b/mpp/mpp.h @@ -20,30 +20,52 @@ #include "rk_mpi.h" #include "mpp_list.h" #include "mpp_thread.h" +#include "mpp_hal.h" class Mpp { public: - Mpp(MppCtxType type); + Mpp(MppCtxType type, MppCodingType coding); ~Mpp(); - mpp_list *packets; - mpp_list *frames; - - MppThread *thd_codec; - MppThread *thd_hal; - - RK_S32 thread_codec_running; - RK_S32 thread_codec_reset; - - RK_U32 status; - MPP_RET put_packet(MppPacket packet); MPP_RET get_frame(MppFrame *frame); MPP_RET put_frame(MppFrame frame); MPP_RET get_packet(MppPacket *packet); + + Mutex mPacketLock; + Mutex mFrameLock; + Mutex mTaskLock; + + mpp_list *mPackets; + mpp_list *mFrames; + mpp_list *mTasks; + + RK_U32 mPacketPutCount; + RK_U32 mPacketGetCount; + RK_U32 mFramePutCount; + RK_U32 mFrameGetCount; + RK_U32 mTaskPutCount; + RK_U32 mTaskGetCount; + + MppBufferGroup mPacketGroup; + MppBufferGroup mFrameGroup; + + MppThread *mTheadCodec; + MppThread *mThreadHal; + + MppCtxType mType; + MppCodingType mCoding; + + RK_U32 mStatus; + + MppHalDecTask **mTask; + RK_U32 mTaskNum; + private: + void clear(); + Mpp(); Mpp(const Mpp &); Mpp &operator=(const Mpp &); diff --git a/mpp/mpp_buffer_impl.cpp b/mpp/mpp_buffer_impl.cpp index a03a2efd..f8f4b09f 100644 --- a/mpp/mpp_buffer_impl.cpp +++ b/mpp/mpp_buffer_impl.cpp @@ -22,31 +22,26 @@ #include "mpp_mem.h" #include "mpp_buffer_impl.h" -#define MPP_BUFFER_SERVICE_LOCK() pthread_mutex_lock(&service.lock) -#define MPP_BUFFER_SERVICE_UNLOCK() pthread_mutex_unlock(&service.lock) +#define SEARCH_GROUP_NORMAL(id) search_group_by_id_no_lock(&service.mListGroup, id) +#define SEARCH_GROUP_ORPHAN(id) search_group_by_id_no_lock(&service.mListOrphan, id) -#define SEARCH_GROUP_NORMAL(id) search_group_by_id_no_lock(&service.list_group, id) -#define SEARCH_GROUP_ORPHAN(id) search_group_by_id_no_lock(&service.list_orphan, id) +class MppBufferService { +public: + MppBufferService(); + ~MppBufferService(); -typedef struct { - pthread_mutex_t lock; + Mutex mLock; RK_U32 group_id; RK_U32 group_count; - struct list_head list_group; + struct list_head mListGroup; // list for used buffer which do not have group - struct list_head list_orphan; -} MppBufferService; - -static MppBufferService service = { - PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, - 0, - 0, - LIST_HEAD_INIT(service.list_group), - LIST_HEAD_INIT(service.list_orphan), + struct list_head mListOrphan; }; +static MppBufferService service; + static MppBufferGroupImpl *search_group_by_id_no_lock(struct list_head *list, RK_U32 group_id) { MppBufferGroupImpl *pos, *n; @@ -75,13 +70,9 @@ MPP_RET deinit_buffer_no_lock(MppBufferImpl *buffer) list_del_init(&buffer->list_status); MppBufferGroupImpl *group = SEARCH_GROUP_NORMAL(buffer->group_id); if (group) { - switch (group->mode) { - case MPP_BUFFER_MODE_NORMAL : { + if (buffer->internal) group->alloc_api->free(group->allocator, &buffer->info); - } break; - default : { - } break; - } + group->usage -= buffer->info.size; group->count--; } else { @@ -134,13 +125,13 @@ static MPP_RET inc_buffer_ref_no_lock(MppBufferImpl *buffer) MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info) { - MppBufferImpl *p = mpp_malloc(MppBufferImpl, 1); + MppBufferImpl *p = mpp_calloc(MppBufferImpl, 1); if (NULL == p) { mpp_err("mpp_buffer_create failed to allocate context\n"); return MPP_ERR_MALLOC; } - MPP_BUFFER_SERVICE_LOCK(); + Mutex::Autolock auto_lock(&service.mLock); MppBufferGroupImpl *group = SEARCH_GROUP_NORMAL(group_id); if (group) { @@ -149,9 +140,9 @@ MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info) if (MPP_OK != ret) { mpp_err("mpp_buffer_create failed to create buffer with size %d\n", info->size); mpp_free(p); - MPP_BUFFER_SERVICE_UNLOCK(); return MPP_ERR_MALLOC; } + p->internal = 1; } p->info = *info; @@ -162,8 +153,6 @@ MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info) strncpy(p->tag, tag, sizeof(p->tag)); p->group_id = group_id; - p->used = 0; - p->ref_count = 0; INIT_LIST_HEAD(&p->list_status); list_add_tail(&p->list_status, &group->list_unused); group->usage += info->size; @@ -174,33 +163,22 @@ MPP_RET mpp_buffer_create(const char *tag, RK_U32 group_id, MppBufferInfo *info) p = NULL; } - MPP_BUFFER_SERVICE_UNLOCK(); - return (p) ? (MPP_OK) : (MPP_NOK); } MPP_RET mpp_buffer_destroy(MppBufferImpl *buffer) { - MPP_BUFFER_SERVICE_LOCK(); + Mutex::Autolock auto_lock(&service.mLock); deinit_buffer_no_lock(buffer); - MPP_BUFFER_SERVICE_UNLOCK(); - return MPP_OK; } MPP_RET mpp_buffer_ref_inc(MppBufferImpl *buffer) { - MPP_RET ret; - - MPP_BUFFER_SERVICE_LOCK(); - - ret = inc_buffer_ref_no_lock(buffer); - - MPP_BUFFER_SERVICE_UNLOCK(); - - return ret; + Mutex::Autolock auto_lock(&service.mLock); + return inc_buffer_ref_no_lock(buffer); } @@ -211,7 +189,7 @@ MPP_RET mpp_buffer_ref_dec(MppBufferImpl *buffer) return MPP_NOK; } - MPP_BUFFER_SERVICE_LOCK(); + Mutex::Autolock auto_lock(&service.mLock); buffer->ref_count--; if (0 == buffer->ref_count) { @@ -224,8 +202,6 @@ MPP_RET mpp_buffer_ref_dec(MppBufferImpl *buffer) } } - MPP_BUFFER_SERVICE_UNLOCK(); - return MPP_OK; } @@ -233,7 +209,7 @@ MppBufferImpl *mpp_buffer_get_unused(MppBufferGroupImpl *p, size_t size) { MppBufferImpl *buffer = NULL; - MPP_BUFFER_SERVICE_LOCK(); + Mutex::Autolock auto_lock(&service.mLock); if (!list_empty(&p->list_unused)) { MppBufferImpl *pos, *n; @@ -246,8 +222,6 @@ MppBufferImpl *mpp_buffer_get_unused(MppBufferGroupImpl *p, size_t size) } } - MPP_BUFFER_SERVICE_UNLOCK(); - return buffer; } @@ -260,13 +234,13 @@ MPP_RET mpp_buffer_group_init(MppBufferGroupImpl **group, const char *tag, MppBu return MPP_ERR_MALLOC; } - MPP_BUFFER_SERVICE_LOCK(); + Mutex::Autolock auto_lock(&service.mLock); INIT_LIST_HEAD(&p->list_group); INIT_LIST_HEAD(&p->list_used); INIT_LIST_HEAD(&p->list_unused); - list_add_tail(&p->list_group, &service.list_group); + list_add_tail(&p->list_group, &service.mListGroup); snprintf(p->tag, sizeof(p->tag), "%s_%d", tag, service.group_id); p->mode = mode; @@ -289,7 +263,6 @@ MPP_RET mpp_buffer_group_init(MppBufferGroupImpl **group, const char *tag, MppBu mpp_alloctor_get(&p->allocator, &p->alloc_api, type); - MPP_BUFFER_SERVICE_UNLOCK(); *group = p; return MPP_OK; @@ -302,7 +275,7 @@ MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p) return MPP_ERR_NULL_PTR; } - MPP_BUFFER_SERVICE_LOCK(); + Mutex::Autolock auto_lock(&service.mLock); // remove unused list if (!list_empty(&p->list_unused)) { @@ -317,7 +290,7 @@ MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p) } else { // otherwise move the group to list_orphan and wait for buffer release list_del_init(&p->list_group); - list_add_tail(&p->list_group, &service.list_orphan); + list_add_tail(&p->list_group, &service.mListOrphan); mpp_err("mpp_group %p deinit with %d buffer not released\n", p, p->usage); // if any buffer with mode MPP_BUFFER_MODE_COMMIT found it should be error MppBufferImpl *pos, *n; @@ -326,8 +299,34 @@ MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p) } } - MPP_BUFFER_SERVICE_UNLOCK(); - return MPP_OK; } +MppBufferService::MppBufferService() + : group_id(0), + group_count(0) +{ + INIT_LIST_HEAD(&mListGroup); + INIT_LIST_HEAD(&mListOrphan); +} + +MppBufferService::~MppBufferService() +{ + // remove all group first + if (!list_empty(&mListGroup)) { + MppBufferGroupImpl *pos, *n; + list_for_each_entry_safe(pos, n, &mListGroup, MppBufferGroupImpl, list_group) { + mpp_buffer_group_deinit(pos); + } + } + + // remove all orphan buffer + if (!list_empty(&mListOrphan)) { + MppBufferImpl *pos, *n; + list_for_each_entry_safe(pos, n, &mListOrphan, MppBufferImpl, list_status) { + deinit_buffer_no_lock(pos); + } + } +} + + diff --git a/mpp/mpp_buffer_impl.h b/mpp/mpp_buffer_impl.h index 75c496ec..23d4fbee 100644 --- a/mpp/mpp_buffer_impl.h +++ b/mpp/mpp_buffer_impl.h @@ -43,6 +43,7 @@ struct MppBufferImpl_t { // used flag is for used/unused list detection RK_U32 used; + RK_U32 internal; RK_S32 ref_count; struct list_head list_status; }; diff --git a/mpp/mpp_frame.cpp b/mpp/mpp_frame.cpp index a10e7d11..9c67f9db 100644 --- a/mpp/mpp_frame.cpp +++ b/mpp/mpp_frame.cpp @@ -59,6 +59,10 @@ MPP_RET mpp_frame_deinit(MppFrame *frame) return MPP_ERR_NULL_PTR; } + MppBuffer buffer = mpp_frame_get_buffer(*frame); + if (buffer) + mpp_buffer_put(buffer); + mpp_free(*frame); *frame = NULL; return MPP_OK; diff --git a/osal/inc/mpp_thread.h b/osal/inc/mpp_thread.h index 91902323..ebc41e43 100644 --- a/osal/inc/mpp_thread.h +++ b/osal/inc/mpp_thread.h @@ -56,25 +56,107 @@ typedef enum { } MppThreadStatus; #ifdef __cplusplus + +class Mutex; +class Condition; + +/* + * for shorter type name and function name + */ +class Mutex { +public: + Mutex(); + ~Mutex(); + + void lock(); + void unlock(); + + class Autolock { + public: + inline Autolock(Mutex& mutex) : mLock(mutex) { mLock.lock(); } + inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); } + inline ~Autolock() { mLock.unlock(); } + private: + Mutex& mLock; + }; + +private: + friend class Condition; + + pthread_mutex_t mMutex; + + Mutex(const Mutex &); + Mutex &operator = (const Mutex&); +}; + +inline Mutex::Mutex() { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&mMutex, &attr); + pthread_mutexattr_destroy(&attr); +} +inline Mutex::~Mutex() { + pthread_mutex_destroy(&mMutex); +} +inline void Mutex::lock() { + pthread_mutex_lock(&mMutex); +} +inline void Mutex::unlock() { + pthread_mutex_unlock(&mMutex); +} + +typedef Mutex::Autolock AutoMutex; + + +/* + * for shorter type name and function name + */ +class Condition { +public: + Condition(); + Condition(int type); + ~Condition(); + void wait(Mutex& mutex); + void signal(); + +private: + pthread_cond_t mCond; +}; + +inline Condition::Condition() { + pthread_cond_init(&mCond, NULL); +} +inline Condition::~Condition() { + pthread_cond_destroy(&mCond); +} +inline void Condition::wait(Mutex& mutex) { + pthread_cond_wait(&mCond, &mutex.mMutex); +} +inline void Condition::signal() { + pthread_cond_signal(&mCond); +} + + class MppThread { public: MppThread(MppThreadFunc func, void *ctx); - ~MppThread(); + ~MppThread() {}; MppThreadStatus get_status(); void set_status(MppThreadStatus status); void start(); void stop(); - void lock(); - void unlock(); - void wait(); - void signal(); + void lock() { mLock.lock(); } + void unlock() { mLock.unlock(); } + void wait() { mCondition.wait(mLock); } + void signal() { mCondition.signal(); } private: + Mutex mLock; + Condition mCondition; pthread_t mThread; - pthread_mutex_t mLock; - pthread_cond_t mCondition; MppThreadStatus mStatus; MppThreadFunc mFunction; @@ -84,6 +166,7 @@ private: MppThread(const MppThread &); MppThread &operator=(const MppThread &); }; + #endif #endif /*__MPP_THREAD_H__*/ diff --git a/osal/mpp_thread.cpp b/osal/mpp_thread.cpp index b6732464..062d2833 100644 --- a/osal/mpp_thread.cpp +++ b/osal/mpp_thread.cpp @@ -30,19 +30,6 @@ MppThread::MppThread(MppThreadFunc func, void *ctx) mFunction(func), mContext(ctx) { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&mLock, &attr); - pthread_mutexattr_destroy(&attr); - - pthread_cond_init(&mCondition, NULL); -} - -MppThread::~MppThread() -{ - pthread_cond_destroy(&mCondition); - pthread_mutex_destroy(&mLock); } MppThreadStatus MppThread::get_status() @@ -75,6 +62,8 @@ void MppThread::start() void MppThread::stop() { mStatus = MPP_THREAD_STOPPING; + signal(); + void *dummy; pthread_join(mThread, &dummy); thread_dbg(MPP_THREAD_DBG_FUNCTION, "mThread %p mContext %p destroy success\n", @@ -83,23 +72,3 @@ void MppThread::stop() mStatus = MPP_THREAD_UNINITED; } -void MppThread::lock() -{ - pthread_mutex_lock(&mLock); -} - -void MppThread::unlock() -{ - pthread_mutex_unlock(&mLock); -} - -void MppThread::wait() -{ - pthread_cond_wait(&mCondition, &mLock); -} - -void MppThread::signal() -{ - pthread_cond_signal(&mCondition); -} - diff --git a/test/mpi_test.c b/test/mpi_test.c index cf604886..f11007d2 100644 --- a/test/mpi_test.c +++ b/test/mpi_test.c @@ -118,7 +118,7 @@ int main() mpp_packet_deinit(&dec_in); } - for (i = 0; i < MPI_DEC_LOOP_COUNT; i++) { + for (i = 0; i < MPI_DEC_LOOP_COUNT; ) { ret = mpi->decode_get_frame(ctx, &dec_out); if (MPP_OK != ret) { goto MPP_TEST_FAILED; @@ -134,6 +134,7 @@ int main() mpp_frame_deinit(&dec_out); dec_out = next; } while (dec_out); + i++; } }