[mpp]: add two thread to mpp

git-svn-id: https://10.10.10.66:8443/svn/MediaProcessPlatform/trunk/mpp@164 6e48237b-75ef-9749-8fc9-41990f28c85a
This commit is contained in:
ChenHengming
2015-08-26 03:53:12 +00:00
parent 027f647eb8
commit 3064dc36e5
4 changed files with 110 additions and 71 deletions

View File

@@ -26,15 +26,11 @@
#include "mpp_packet.h" #include "mpp_packet.h"
#include "mpp_packet_impl.h" #include "mpp_packet_impl.h"
#if 0
static void *thread_hal(void *data) static void *thread_hal(void *data)
{ {
Mpp *mpp = (Mpp*)data; //Mpp *mpp = (Mpp*)data;
mpp_list *frames = mpp->frames;
MppPacketImpl packet;
MppFrame frame;
while () { while (0) {
/* /*
* hal thread wait for dxva interface intput firt * hal thread wait for dxva interface intput firt
*/ */
@@ -58,8 +54,9 @@ static void *thread_hal(void *data)
// mark frame in output queue // mark frame in output queue
// wait up output thread to get a output frame // wait up output thread to get a output frame
} }
return NULL;
} }
#endif
static void *thread_dec(void *data) static void *thread_dec(void *data)
{ {
@@ -146,6 +143,8 @@ static void *thread_enc(void *data)
Mpp::Mpp(MppCtxType type) Mpp::Mpp(MppCtxType type)
: packets(NULL), : packets(NULL),
frames(NULL), frames(NULL),
thd_codec(NULL),
thd_hal(NULL),
thread_codec_running(0), thread_codec_running(0),
thread_codec_reset(0), thread_codec_reset(0),
status(0) status(0)
@@ -154,50 +153,65 @@ Mpp::Mpp(MppCtxType type)
case MPP_CTX_DEC : { case MPP_CTX_DEC : {
packets = new mpp_list((node_destructor)NULL); packets = new mpp_list((node_destructor)NULL);
frames = new mpp_list((node_destructor)mpp_frame_deinit); frames = new mpp_list((node_destructor)mpp_frame_deinit);
thread_start(thread_dec); thd_codec = new MppThread(thread_dec, this);
thd_hal = new MppThread(thread_hal, this);
} break; } break;
case MPP_CTX_ENC : { case MPP_CTX_ENC : {
frames = new mpp_list((node_destructor)NULL); frames = new mpp_list((node_destructor)NULL);
packets = new mpp_list((node_destructor)mpp_packet_deinit); packets = new mpp_list((node_destructor)mpp_packet_deinit);
thread_start(thread_enc); thd_codec = new MppThread(thread_enc, this);
thd_hal = new MppThread(thread_hal, this);
} break; } break;
default : { default : {
mpp_err("Mpp error type %d\n", type); mpp_err("Mpp error type %d\n", type);
} break; } break;
} }
if (packets && frames && thd_codec && thd_hal) {
thd_codec->start();
thd_hal->start();
} else {
if (thd_codec)
thd_codec->stop();
if (thd_hal)
thd_hal->stop();
if (thd_codec) {
delete thd_codec;
thd_codec = NULL;
}
if (thd_hal) {
delete thd_hal;
thd_hal = NULL;
}
if (packets) {
delete packets;
packets = NULL;
}
if (frames) {
delete frames;
frames = NULL;
}
}
} }
Mpp::~Mpp () Mpp::~Mpp ()
{ {
if (thread_codec_running) if (thd_codec)
thread_stop(); thd_codec->stop();
if (thd_hal)
thd_hal->stop();
if (thd_codec)
delete thd_codec;
if (thd_hal)
delete thd_hal;
if (packets) if (packets)
delete packets; delete packets;
if (frames) if (frames)
delete frames; delete frames;
} }
void Mpp::thread_start(MppThreadFunc func)
{
if (!thread_codec_running) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&thread_codec, &attr, func, (void*)this))
status = MPP_ERR_FATAL_THREAD;
else
thread_codec_running = 1;
pthread_attr_destroy(&attr);
}
}
void Mpp::thread_stop()
{
thread_codec_running = 0;
void *dummy;
pthread_join(thread_codec, &dummy);
}
MPP_RET Mpp::put_packet(MppPacket packet) MPP_RET Mpp::put_packet(MppPacket packet)
{ {
// TODO: packet data need preprocess or can be write to hardware buffer // TODO: packet data need preprocess or can be write to hardware buffer

View File

@@ -29,8 +29,9 @@ public:
mpp_list *packets; mpp_list *packets;
mpp_list *frames; mpp_list *frames;
pthread_t thread_codec; MppThread *thd_codec;
pthread_t thread_hal; MppThread *thd_hal;
RK_S32 thread_codec_running; RK_S32 thread_codec_running;
RK_S32 thread_codec_reset; RK_S32 thread_codec_reset;
@@ -43,9 +44,6 @@ public:
MPP_RET get_packet(MppPacket *packet); MPP_RET get_packet(MppPacket *packet);
private: private:
void thread_start(MppThreadFunc func);
void thread_stop();
Mpp(); Mpp();
Mpp(const Mpp &); Mpp(const Mpp &);
Mpp &operator=(const Mpp &); Mpp &operator=(const Mpp &);

View File

@@ -61,19 +61,24 @@ public:
MppThread(MppThreadFunc func, void *ctx); MppThread(MppThreadFunc func, void *ctx);
~MppThread(); ~MppThread();
MppThreadStatus get_status();
void set_status(MppThreadStatus status);
void start();
void stop();
void lock(); void lock();
void unlock(); void unlock();
void wait(); void wait();
void signal(); void signal();
private: private:
pthread_t thread; pthread_t mThread;
pthread_mutex_t thread_lock; pthread_mutex_t mLock;
pthread_cond_t condition; pthread_cond_t mCondition;
MppThreadStatus status; MppThreadStatus mStatus;
MppThreadFunc function; MppThreadFunc mFunction;
void *context; void *mContext;
MppThread(); MppThread();
MppThread(const MppThread &); MppThread(const MppThread &);

View File

@@ -26,56 +26,78 @@ static RK_U32 thread_debug = 0;
#define thread_dbg(flag, fmt, ...) mpp_dbg(thread_debug, flag, fmt, ## __VA_ARGS__) #define thread_dbg(flag, fmt, ...) mpp_dbg(thread_debug, flag, fmt, ## __VA_ARGS__)
MppThread::MppThread(MppThreadFunc func, void *ctx) MppThread::MppThread(MppThreadFunc func, void *ctx)
: status(MPP_THREAD_UNINITED) : mStatus(MPP_THREAD_UNINITED),
mFunction(func),
mContext(ctx)
{ {
pthread_mutexattr_t mutex; pthread_mutexattr_t attr;
pthread_mutexattr_init(&mutex); pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&mutex, PTHREAD_MUTEX_RECURSIVE); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&thread_lock, &mutex); pthread_mutex_init(&mLock, &attr);
pthread_mutexattr_destroy(&mutex); pthread_mutexattr_destroy(&attr);
pthread_cond_init(&condition, NULL);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (0 == pthread_create(&thread, &attr, func, ctx)) {
status = MPP_THREAD_RUNNING;
thread_dbg(MPP_THREAD_DBG_FUNCTION, "thread %p context %p create success\n",
func, ctx);
}
pthread_attr_destroy(&attr);
pthread_cond_init(&mCondition, NULL);
} }
MppThread::~MppThread() MppThread::~MppThread()
{ {
status = MPP_THREAD_STOPPING; pthread_cond_destroy(&mCondition);
pthread_mutex_destroy(&mLock);
}
MppThreadStatus MppThread::get_status()
{
return mStatus;
}
void MppThread::set_status(MppThreadStatus status)
{
mStatus = status;
}
void MppThread::start()
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (MPP_THREAD_UNINITED == mStatus) {
if (0 == pthread_create(&mThread, &attr, mFunction, mContext)) {
mStatus = MPP_THREAD_RUNNING;
thread_dbg(MPP_THREAD_DBG_FUNCTION, "mThread %p mContext %p create success\n",
mFunction, mContext);
}
}
pthread_attr_destroy(&attr);
}
void MppThread::stop()
{
mStatus = MPP_THREAD_STOPPING;
void *dummy; void *dummy;
pthread_join(thread, &dummy); pthread_join(mThread, &dummy);
thread_dbg(MPP_THREAD_DBG_FUNCTION, "thread %p context %p destroy success\n", thread_dbg(MPP_THREAD_DBG_FUNCTION, "mThread %p mContext %p destroy success\n",
function, context); mFunction, mContext);
mStatus = MPP_THREAD_UNINITED;
} }
void MppThread::lock() void MppThread::lock()
{ {
pthread_mutex_lock(&thread_lock); pthread_mutex_lock(&mLock);
} }
void MppThread::unlock() void MppThread::unlock()
{ {
pthread_mutex_unlock(&thread_lock); pthread_mutex_unlock(&mLock);
} }
void MppThread::wait() void MppThread::wait()
{ {
pthread_cond_wait(&condition, &thread_lock); pthread_cond_wait(&mCondition, &mLock);
} }
void MppThread::signal() void MppThread::signal()
{ {
pthread_cond_signal(&condition); pthread_cond_signal(&mCondition);
} }