[mpp/mpp_enc]: Add async encoder flow

Change-Id: I0e040b8e9b1cdb40be996afc818a7ffb16802d09
Signed-off-by: Herman Chen <herman.chen@rock-chips.com>
This commit is contained in:
Herman Chen
2022-02-15 09:00:24 +08:00
parent 603b117d91
commit 2e25db1a7d
8 changed files with 873 additions and 47 deletions

View File

@@ -165,6 +165,7 @@ MPP_RET mpp_enc_init_v2(MppEnc *ctx, MppEncInitCfg *cfg);
MPP_RET mpp_enc_deinit_v2(MppEnc ctx);
MPP_RET mpp_enc_start_v2(MppEnc ctx);
MPP_RET mpp_enc_start_async(MppEnc ctx);
MPP_RET mpp_enc_stop_v2(MppEnc ctx);
MPP_RET mpp_enc_control_v2(MppEnc ctx, MpiCmd cmd, void *param);

View File

@@ -138,6 +138,7 @@ extern "C" {
#endif
void *mpp_enc_thread(void *data);
void *mpp_enc_async_thread(void *data);
#ifdef __cplusplus
}

View File

@@ -631,7 +631,7 @@ static void mpp_dec_put_frame(Mpp *mpp, RK_S32 index, HalDecTaskFlag flags)
dec_vproc_signal(dec->vproc);
} else {
// direct output -> copy a new MppFrame and output
mpp_list *list = mpp->mFrames;
mpp_list *list = mpp->mFrmOut;
MppFrame out = NULL;
mpp_frame_init(&out);
@@ -954,8 +954,8 @@ static MPP_RET try_proc_dec_task(Mpp *mpp, DecTask *task)
dec_dbg_detail("detail: %p check prev task pass\n", dec);
/* too many frame delay in dispaly queue */
if (mpp->mFrames) {
task->wait.dis_que_full = (mpp->mFrames->list_size() > 4) ? 1 : 0;
if (mpp->mFrmOut) {
task->wait.dis_que_full = (mpp->mFrmOut->list_size() > 4) ? 1 : 0;
if (task->wait.dis_que_full)
return MPP_ERR_DISPLAY_FULL;
}

View File

@@ -2097,3 +2097,717 @@ void *mpp_enc_thread(void *data)
return NULL;
}
/* encoder internal work flow */
typedef union EncAsyncStatus_u {
RK_U32 val;
struct {
RK_U32 task_hnd_rdy : 1;
RK_U32 task_in_rdy : 1;
RK_U32 task_out_rdy : 1;
RK_U32 frm_pkt_rdy : 1;
RK_U32 hal_task_reset_rdy : 1; // reset hal task to start
RK_U32 rc_check_frm_drop : 1; // rc stage
RK_U32 pkt_buf_rdy : 1; // prepare pkt buf
RK_U32 enc_start : 1; // enc stage
RK_U32 refs_force_update : 1; // enc stage
RK_U32 low_delay_again : 1; // enc stage low delay output again
RK_U32 enc_backup : 1; // enc stage
RK_U32 enc_restore : 1; // reenc flow start point
RK_U32 enc_proc_dpb : 1; // enc stage
RK_U32 rc_frm_start : 1; // rc stage
RK_U32 check_type_reenc : 1; // flow checkpoint if reenc -> enc_restore
RK_U32 enc_proc_hal : 1; // enc stage
RK_U32 hal_get_task : 1; // hal stage
RK_U32 rc_hal_start : 1; // rc stage
RK_U32 hal_gen_reg : 1; // hal stage
RK_U32 hal_start : 1; // hal stage
RK_U32 hal_wait : 1; // hal stage NOTE: special in low delay mode
RK_U32 rc_hal_end : 1; // rc stage
RK_U32 hal_ret_task : 1; // hal stage
RK_U32 enc_update_hal : 1; // enc stage
RK_U32 rc_frm_end : 1; // rc stage
RK_U32 check_rc_reenc : 1; // flow checkpoint if reenc -> enc_restore
RK_U32 enc_done : 1; // done stage
RK_U32 slice_out_done : 1;
};
} EncAsyncStatus;
typedef union EncAsyncWait_u {
RK_U32 val;
struct {
RK_U32 enc_frm_in : 1; // 0x0001 MPP_ENC_NOTIFY_FRAME_ENQUEUE
RK_U32 reserv0002 : 1; // 0x0002
RK_U32 reserv0004 : 1; // 0x0004
RK_U32 enc_pkt_out : 1; // 0x0008 MPP_ENC_NOTIFY_PACKET_ENQUEUE
RK_U32 task_hnd : 1; // 0x0010
RK_U32 reserv0020 : 1; // 0x0020
RK_U32 reserv0040 : 1; // 0x0040
RK_U32 reserv0080 : 1; // 0x0080
RK_U32 reserv0100 : 1; // 0x0100
RK_U32 reserv0200 : 1; // 0x0200
RK_U32 reserv0400 : 1; // 0x0400
RK_U32 reserv0800 : 1; // 0x0800
RK_U32 reserv1000 : 1; // 0x1000
RK_U32 reserv2000 : 1; // 0x2000
RK_U32 reserv4000 : 1; // 0x4000
RK_U32 reserv8000 : 1; // 0x8000
};
} EncAsyncWait;
typedef struct EncAsyncTaskInfo_t {
RK_S32 seq_idx;
EncAsyncStatus status;
RK_S64 pts;
HalEncTask task;
EncRcTask rc;
MppEncRefFrmUsrCfg usr;
} EncAsyncTaskInfo;
typedef struct EncAsyncTask_t {
HalTaskGroup tasks;
HalTaskHnd hnd;
EncAsyncTaskInfo info;
} EncAsyncTask;
static void reset_async_task(EncAsyncTask *task)
{
memset(&task->info, 0, sizeof(task->info));
task->info.task.rc_task = &task->info.rc;
task->info.task.frm_cfg = &task->info.usr;
}
static void terminate_async_task(MppEncImpl *enc, EncAsyncTask *async)
{
HalEncTask *hal_task = &async->info.task;
EncFrmStatus *frm = &enc->rc_task.frm;
Mpp *mpp = (Mpp *)enc->mpp;
mpp_stopwatch_record(hal_task->stopwatch, "encode task done");
if (hal_task->packet) {
MppPacket pkt = hal_task->packet;
/* setup output packet and meta data */
mpp_packet_set_length(pkt, hal_task->length);
/*
* First return output packet.
* Then enqueue task back to input port.
* Final user will release the mpp_frame they had input.
*/
enc_dbg_detail("task %d enqueue packet pts %lld\n", frm->seq_idx, enc->task_pts);
if (mpp->mPktOut) {
mpp_list *pkt_out = mpp->mPktOut;
if (enc->frame) {
MppMeta meta = mpp_packet_get_meta(pkt);
MppStopwatch stopwatch = mpp_frame_get_stopwatch(enc->frame);
mpp_stopwatch_record(stopwatch, "encode task terminate");
mpp_meta_set_frame(meta, KEY_INPUT_FRAME, enc->frame);
enc->frame = NULL;
}
AutoMutex autolock(pkt_out->mutex());
pkt_out->add_at_tail(&pkt, sizeof(pkt));
mpp->mPacketPutCount++;
pkt_out->signal();
mpp_assert(pkt);
enc_dbg_detail("packet out ready\n");
}
}
reset_async_task(async);
}
static MPP_RET check_async_frm_pkt(EncAsyncTask *async)
{
HalEncTask *hal_task = &async->info.task;
MppPacket packet = hal_task->packet;
MppFrame frame = hal_task->frame;
hal_task->input = NULL;
hal_task->output = NULL;
if (packet)
hal_task->output = mpp_packet_get_buffer(packet);
else {
mpp_packet_new(&packet);
hal_task->packet = packet;
}
if (frame) {
RK_S64 pts = mpp_frame_get_pts(frame);
MppBuffer frm_buf = mpp_frame_get_buffer(frame);
hal_task->input = frm_buf;
mpp_packet_set_pts(packet, pts);
if (mpp_frame_get_eos(frame))
mpp_packet_set_eos(packet);
else
mpp_packet_clr_eos(packet);
}
return (NULL == frame || NULL == hal_task->input) ? MPP_NOK : MPP_OK;
}
static MPP_RET check_async_pkt_buf(MppEncImpl *enc, EncAsyncTask *async)
{
HalEncTask *hal_task = &async->info.task;
if (NULL == hal_task->output) {
/* NOTE: set buffer w * h * 1.5 to avoid buffer overflow */
Mpp *mpp = (Mpp *)enc->mpp;
MppEncPrepCfg *prep = &enc->cfg.prep;
RK_U32 width = MPP_ALIGN(prep->width, 16);
RK_U32 height = MPP_ALIGN(prep->height, 16);
RK_U32 size = (enc->coding == MPP_VIDEO_CodingMJPEG) ?
(width * height * 3 / 2) : (width * height);
MppPacketImpl *pkt = (MppPacketImpl *)hal_task->packet;
MppBuffer buffer = NULL;
mpp_assert(size);
mpp_buffer_get(mpp->mPacketGroup, &buffer, size);
mpp_assert(buffer);
enc->pkt_buf = buffer;
pkt->data = mpp_buffer_get_ptr(buffer);
pkt->pos = pkt->data;
pkt->size = mpp_buffer_get_size(buffer);
pkt->length = 0;
pkt->buffer = buffer;
hal_task->output = buffer;
enc_dbg_detail("create output pkt %p buf %p\n", hal_task->packet, buffer);
} else {
enc_dbg_detail("output to pkt %p buf %p pos %p length %d\n",
hal_task->packet, hal_task->output,
mpp_packet_get_pos(hal_task->packet),
mpp_packet_get_length(hal_task->packet));
}
return MPP_OK;
}
static MPP_RET try_get_async_task(MppEncImpl *enc, EncAsyncTask *async, EncAsyncWait *wait)
{
Mpp *mpp = (Mpp *)enc->mpp;
EncRcTask *rc_task = &enc->rc_task;
EncFrmStatus *frm = &rc_task->frm;
MppEncHeaderStatus *hdr_status = &enc->hdr_status;
HalEncTask *hal_task = &async->info.task;
EncAsyncStatus *status = &async->info.status;
MppStopwatch stopwatch = NULL;
MppPacket packet = hal_task->packet;
MppFrame frame = hal_task->frame;
RK_U32 seq_idx = 0;
MPP_RET ret = MPP_OK;
if (NULL == async->hnd) {
hal_task_get_hnd(async->tasks, TASK_IDLE, &async->hnd);
if (async->hnd) {
status->task_hnd_rdy = 1;
wait->task_hnd = 0;
enc_dbg_detail("get hnd success\n");
reset_async_task(async);
} else {
wait->task_hnd = 1;
enc_dbg_detail("get hnd failed\n");
return MPP_NOK;
}
}
if (NULL == frame) {
if (mpp->mFrmIn) {
mpp_list *frm_in = mpp->mFrmIn;
AutoMutex autolock(frm_in->mutex());
if (frm_in->list_size()) {
frm_in->del_at_head(&frame, sizeof(frame));
mpp->mFrameGetCount++;
mpp_assert(frame);
status->task_in_rdy = 1;
wait->enc_frm_in = 0;
enc_dbg_detail("get input frame success\n");
stopwatch = mpp_frame_get_stopwatch(frame);
mpp_stopwatch_record(stopwatch, "encode task start");
hal_task->frame = frame;
}
}
if (NULL == frame) {
enc_dbg_detail("get input frame failed\n");
wait->enc_frm_in = 1;
return MPP_NOK;
}
}
if (NULL == packet) {
if (mpp_frame_has_meta(frame)) {
MppMeta meta = mpp_frame_get_meta(frame);
mpp_meta_get_packet(meta, KEY_OUTPUT_PACKET, &packet);
}
if (packet) {
status->task_out_rdy = 1;
wait->enc_pkt_out = 0;
enc_dbg_detail("get output packet success\n");
hal_task->packet = packet;
}
}
mpp_assert(frame);
stopwatch = mpp_frame_get_stopwatch(frame);
/*
* 6. get frame and packet for one task
* If there is no input frame just return empty packet task
*/
if (!status->frm_pkt_rdy) {
if (check_async_frm_pkt(async)) {
mpp_stopwatch_record(stopwatch, "invalid on check frm pkt");
reset_hal_enc_task(hal_task);
ret = MPP_NOK;
goto TASK_DONE;
}
status->frm_pkt_rdy = 1;
hal_task->stopwatch = stopwatch;
enc_dbg_detail("task frame packet ready\n");
async->info.seq_idx = enc->task_idx++;
async->info.pts = mpp_frame_get_pts(hal_task->frame);
rc_task->frame = async->info.task.frame;
enc_dbg_detail("task seq idx %d start\n", seq_idx);
}
seq_idx = async->info.seq_idx;
// 9. check frame drop by frame rate conversion
if (!status->rc_check_frm_drop) {
ENC_RUN_FUNC2(rc_frm_check_drop, enc->rc_ctx, rc_task, enc->mpp, ret);
status->rc_check_frm_drop = 1;
enc_dbg_detail("task %d drop %d\n", seq_idx, frm->drop);
// when the frame should be dropped just return empty packet
if (frm->drop) {
mpp_stopwatch_record(stopwatch, "invalid on frame rate drop");
hal_task->valid = 0;
hal_task->length = 0;
ret = MPP_NOK;
goto TASK_DONE;
}
hal_task->valid = 1;
}
// start encoder task process here
mpp_assert(hal_task->valid);
// 10. check and create packet for output
if (!status->pkt_buf_rdy) {
check_async_pkt_buf(enc, async);
status->pkt_buf_rdy = 1;
enc_dbg_detail("task %d check pkt buffer success\n", seq_idx);
}
mpp_assert(hal_task->packet);
mpp_assert(hal_task->output);
// 11. check hal info update
if (!enc->hal_info_updated) {
update_enc_hal_info(enc);
enc->hal_info_updated = 1;
enc_dbg_detail("task %d update enc hal info success\n", seq_idx);
}
// 12. generate header before hardware stream
if (!hdr_status->ready) {
/* config cpb before generating header */
enc_impl_gen_hdr(enc->impl, enc->hdr_pkt);
enc->hdr_len = mpp_packet_get_length(enc->hdr_pkt);
hdr_status->ready = 1;
enc_dbg_detail("task %d update header length %d\n",
seq_idx, enc->hdr_len);
mpp_packet_append(hal_task->packet, enc->hdr_pkt);
hal_task->header_length = enc->hdr_len;
hal_task->length += enc->hdr_len;
hdr_status->added_by_change = 1;
}
check_hal_task_pkt_len(hal_task, "gen_hdr and adding");
// 13. check frm_meta data force key in input frame and start one frame
if (!status->enc_start) {
enc_dbg_detail("task %d enc start\n", seq_idx);
ENC_RUN_FUNC2(enc_impl_start, enc->impl, hal_task, enc->mpp, ret);
status->enc_start = 1;
}
// 14. setup user_cfg to dpb
if (!status->refs_force_update) {
MppEncRefFrmUsrCfg *frm_cfg = &enc->frm_cfg;
if (frm_cfg->force_flag) {
mpp_enc_refs_set_usr_cfg(enc->refs, frm_cfg);
frm_cfg->force_flag = 0;
}
enc_dbg_detail("task %d refs force update success\n", seq_idx);
status->refs_force_update = 1;
}
// 15. backup dpb
if (!status->enc_backup) {
mpp_enc_refs_stash(enc->refs);
enc_dbg_detail("task %d refs stash success\n", seq_idx);
status->enc_backup = 1;
}
TASK_DONE:
if (ret) {
enc_dbg_detail("task %d terminate\n", seq_idx);
terminate_async_task(enc, async);
}
return ret;
}
static MPP_RET proc_async_task(MppEncImpl *enc, EncAsyncTask *async)
{
Mpp *mpp = (Mpp*)enc->mpp;
EncImpl impl = enc->impl;
MppEncHal hal = enc->enc_hal;
MppEncHeaderStatus *hdr_status = &enc->hdr_status;
EncAsyncStatus *status = &async->info.status;
HalEncTask *hal_task = &async->info.task;
EncRcTask *rc_task = hal_task->rc_task;
EncCpbStatus *cpb = &rc_task->cpb;
EncFrmStatus *frm = &rc_task->frm;
MppPacket packet = hal_task->packet;
RK_U32 seq_idx = async->info.seq_idx;
MPP_RET ret = MPP_OK;
enc_dbg_detail("task %d enc proc dpb\n", seq_idx);
mpp_enc_refs_get_cpb(enc->refs, cpb);
enc_dbg_frm_status("frm %d start ***********************************\n", seq_idx);
ENC_RUN_FUNC2(enc_impl_proc_dpb, impl, hal_task, mpp, ret);
enc_dbg_detail("task %d rc frame start\n", frm->seq_idx);
ENC_RUN_FUNC2(rc_frm_start, enc->rc_ctx, rc_task, mpp, ret);
// 16. generate header before hardware stream
if (enc->hdr_mode == MPP_ENC_HEADER_MODE_EACH_IDR &&
frm->is_intra &&
!hdr_status->added_by_change &&
!hdr_status->added_by_ctrl &&
!hdr_status->added_by_mode) {
enc_dbg_detail("task %d IDR header length %d\n",
frm->seq_idx, enc->hdr_len);
mpp_packet_append(packet, enc->hdr_pkt);
hal_task->header_length = enc->hdr_len;
hal_task->length += enc->hdr_len;
hdr_status->added_by_mode = 1;
}
// check for header adding
check_hal_task_pkt_len(hal_task, "header adding");
/* 17. Add all prefix info before encoding */
if (frm->is_idr && enc->sei_mode >= MPP_ENC_SEI_MODE_ONE_SEQ) {
RK_S32 length = 0;
enc_impl_add_prefix(impl, packet, &length, uuid_version,
enc->version_info, enc->version_length);
hal_task->sei_length += length;
hal_task->length += length;
length = 0;
enc_impl_add_prefix(impl, packet, &length, uuid_rc_cfg,
enc->rc_cfg_info, enc->rc_cfg_length);
hal_task->sei_length += length;
hal_task->length += length;
}
if (mpp_frame_has_meta(hal_task->frame)) {
update_user_datas(impl, packet, hal_task->frame, hal_task);
}
// check for user data adding
check_hal_task_pkt_len(hal_task, "user data adding");
enc_dbg_detail("task %d enc proc hal\n", frm->seq_idx);
ENC_RUN_FUNC2(enc_impl_proc_hal, impl, hal_task, mpp, ret);
enc_dbg_detail("task %d hal get task\n", frm->seq_idx);
ENC_RUN_FUNC2(mpp_enc_hal_get_task, hal, hal_task, mpp, ret);
enc_dbg_detail("task %d rc hal start\n", frm->seq_idx);
ENC_RUN_FUNC2(rc_hal_start, enc->rc_ctx, rc_task, mpp, ret);
enc_dbg_detail("task %d hal generate reg\n", frm->seq_idx);
ENC_RUN_FUNC2(mpp_enc_hal_gen_regs, hal, hal_task, mpp, ret);
mpp_stopwatch_record(hal_task->stopwatch, "encode hal start");
enc_dbg_detail("task %d hal start\n", frm->seq_idx);
ENC_RUN_FUNC2(mpp_enc_hal_start, hal, hal_task, mpp, ret);
status->enc_done = 0;
hal_task_hnd_set_info(async->hnd, &async->info);
hal_task_hnd_set_status(async->hnd, TASK_PROCESSING);
enc_dbg_detail("task %d on processing ret %d\n", frm->seq_idx, ret);
reset_async_task(async);
async->hnd = NULL;
TASK_DONE:
return ret;
}
static MPP_RET check_enc_async_wait(MppEncImpl *enc, EncAsyncWait *wait)
{
MPP_RET ret = MPP_OK;
RK_U32 notify = enc->notify_flag;
RK_U32 last_wait = enc->status_flag;
RK_U32 curr_wait = wait->val;
RK_U32 wait_chg = last_wait & (~curr_wait);
do {
if (enc->reset_flag)
break;
// NOTE: User control should always be processed
if (notify & MPP_ENC_CONTROL)
break;
// NOTE: When condition is not fulfilled check nofify flag again
if (!curr_wait || (curr_wait & notify))
break;
ret = MPP_NOK;
} while (0);
enc_dbg_status("%p %08x -> %08x [%08x] notify %08x -> %s\n", enc,
last_wait, curr_wait, wait_chg, notify, (ret) ? ("wait") : ("work"));
enc->status_flag = wait->val;
enc->notify_flag = 0;
if (ret) {
enc->wait_count++;
} else {
enc->work_count++;
}
return ret;
}
static MPP_RET enc_async_wait_task(MppEncImpl *enc, EncAsyncTaskInfo *info)
{
Mpp *mpp = (Mpp*)enc->mpp;
MppEncHal hal = enc->enc_hal;
HalEncTask *hal_task = &info->task;
EncRcTask *rc_task = hal_task->rc_task;
EncFrmStatus *frm = &info->rc.frm;
MppPacket pkt = hal_task->packet;
MppMeta meta = mpp_packet_get_meta(pkt);
MPP_RET ret = MPP_OK;
enc_dbg_detail("task %d hal wait\n", frm->seq_idx);
ENC_RUN_FUNC2(mpp_enc_hal_wait, hal, hal_task, mpp, ret);
mpp_stopwatch_record(hal_task->stopwatch, "encode hal finish");
enc_dbg_detail("task %d rc hal end\n", frm->seq_idx);
ENC_RUN_FUNC2(rc_hal_end, enc->rc_ctx, rc_task, mpp, ret);
enc_dbg_detail("task %d hal ret task\n", frm->seq_idx);
ENC_RUN_FUNC2(mpp_enc_hal_ret_task, hal, hal_task, mpp, ret);
TASK_DONE:
/* setup output packet and meta data */
mpp_packet_set_length(pkt, hal_task->length);
check_hal_task_pkt_len(hal_task, "hw finish");
mpp_meta_set_s32(meta, KEY_OUTPUT_INTRA, frm->is_intra);
if (rc_task->info.quality_real)
mpp_meta_set_s32(meta, KEY_ENC_AVERAGE_QP, rc_task->info.quality_real);
mpp_meta_set_frame(meta, KEY_INPUT_FRAME, hal_task->frame);
enc_dbg_detail("task %d output packet pts %lld\n", info->seq_idx, info->pts);
if (mpp->mPktOut) {
mpp_list *pkt_out = mpp->mPktOut;
AutoMutex autoLock(pkt_out->mutex());
pkt_out->add_at_tail(&pkt, sizeof(pkt));
mpp->mPacketPutCount++;
pkt_out->signal();
}
return ret;
}
void *mpp_enc_async_thread(void *data)
{
Mpp *mpp = (Mpp*)data;
MppEncImpl *enc = (MppEncImpl *)mpp->mEnc;
MppThread *thd_enc = enc->thread_enc;
HalTaskGroup tasks = NULL;
EncAsyncTask task;
EncAsyncWait wait;
HalTaskHnd hnd = NULL;
MPP_RET ret = MPP_OK;
enc_dbg_func("thread start\n");
reset_async_task(&task);
task.hnd = NULL;
wait.val = 0;
/* init two task for */
ret = hal_task_group_init(&tasks, 2, sizeof(EncAsyncTaskInfo));
if (ret) {
mpp_err_f("hal_task_group_init failed ret %d\n", ret);
goto DONE;;
}
task.tasks = tasks;
while (1) {
{
AutoMutex autolock(thd_enc->mutex());
if (MPP_THREAD_RUNNING != thd_enc->get_status())
break;
if (check_enc_async_wait(enc, &wait)) {
enc_dbg_detail("wait start\n");
thd_enc->wait();
enc_dbg_detail("wait done\n");
}
}
// When encoder is not on encoding process external config and reset
// 1. process user control and reset flag
if (enc->cmd_send != enc->cmd_recv || enc->reset_flag) {
enc_dbg_detail("ctrl proc %d cmd %08x\n", enc->cmd_recv, enc->cmd);
// wait all tasks done
while (MPP_OK == hal_task_get_hnd(tasks, TASK_PROCESSING, &hnd)) {
EncAsyncTaskInfo info;
hal_task_hnd_get_info(hnd, &info);
mpp_assert(!info.status.enc_done);
enc_async_wait_task(enc, &info);
hal_task_hnd_set_status(hnd, TASK_IDLE);
wait.task_hnd = 0;
}
if (enc->cmd_send != enc->cmd_recv) {
sem_wait(&enc->cmd_start);
ret = mpp_enc_proc_cfg(enc, enc->cmd, enc->param);
if (ret)
*enc->cmd_ret = ret;
enc->cmd_recv++;
enc_dbg_detail("ctrl proc %d done send %d\n", enc->cmd_recv,
enc->cmd_send);
mpp_assert(enc->cmd_send == enc->cmd_send);
enc->param = NULL;
enc->cmd = (MpiCmd)0;
sem_post(&enc->cmd_done);
// async cfg update process for hal
// mainly for buffer prepare
mpp_enc_hal_prepare(enc->enc_hal);
/* NOTE: here will clear change flag of rc and prep cfg */
mpp_enc_proc_rc_update(enc);
continue;
}
if (enc->reset_flag) {
enc_dbg_detail("thread reset start\n");
{
AutoMutex autolock(thd_enc->mutex());
enc->status_flag = 0;
}
AutoMutex autolock(thd_enc->mutex(THREAD_CONTROL));
enc->reset_flag = 0;
sem_post(&enc->enc_reset);
enc_dbg_detail("thread reset done\n");
continue;
}
}
// 2. try get a task to encode
ret = try_get_async_task(enc, &task, &wait);
enc_dbg_detail("try_get_async_task ret %d\n", ret);
if (ret) {
hal_task_get_hnd(tasks, TASK_PROCESSING, &hnd);
if (hnd) {
EncAsyncTaskInfo info;
hal_task_hnd_get_info(hnd, &info);
mpp_assert(!info.status.enc_done);
enc_async_wait_task(enc, &info);
hal_task_hnd_set_status(hnd, TASK_IDLE);
wait.task_hnd = 0;
}
continue;
}
mpp_assert(task.info.task.valid);
proc_async_task(enc, &task);
}
DONE:
enc_dbg_func("loop done\n");
if (tasks) {
ret = hal_task_group_deinit(tasks);
tasks = NULL;
}
enc_dbg_func("thread finish\n");
return NULL;
}

View File

@@ -217,6 +217,24 @@ MPP_RET mpp_enc_start_v2(MppEnc ctx)
return MPP_OK;
}
MPP_RET mpp_enc_start_async(MppEnc ctx)
{
MppEncImpl *enc = (MppEncImpl *)ctx;
char name[16];
enc_dbg_func("%p in\n", enc);
snprintf(name, sizeof(name) - 1, "mpp_%se_%d",
strof_coding_type(enc->coding), getpid());
enc->thread_enc = new MppThread(mpp_enc_async_thread, enc->mpp, name);
enc->thread_enc->start();
enc_dbg_func("%p out\n", enc);
return MPP_OK;
}
MPP_RET mpp_enc_stop_v2(MppEnc ctx)
{
MPP_RET ret = MPP_OK;

View File

@@ -122,8 +122,10 @@ public:
MPP_RET notify(RK_U32 flag);
MPP_RET notify(MppBufferGroup group);
mpp_list *mPackets;
mpp_list *mFrames;
mpp_list *mPktIn;
mpp_list *mPktOut;
mpp_list *mFrmIn;
mpp_list *mFrmOut;
/* counters for debug */
RK_U32 mPacketPutCount;
RK_U32 mPacketGetCount;
@@ -176,7 +178,7 @@ public:
MppDec mDec;
MppEnc mEnc;
RK_U32 mEncVersion;
RK_U32 mEncAyncIo;
private:
void clear();
@@ -208,6 +210,10 @@ private:
MPP_RET control_enc(MpiCmd cmd, MppParam param);
MPP_RET control_isp(MpiCmd cmd, MppParam param);
/* for special encoder async io mode */
MPP_RET put_frame_async(MppFrame frame);
MPP_RET get_packet_async(MppPacket *packet);
Mpp(const Mpp &);
Mpp &operator=(const Mpp &);
};

View File

@@ -41,6 +41,8 @@
#define MPP_TEST_FRAME_SIZE SZ_1M
#define MPP_TEST_PACKET_SIZE SZ_512K
typedef MPP_RET (*QueueFunc)(Mpp* mpp, MppPortType type, MppTask *task);
static void mpp_notify_by_buffer_group(void *arg, void *group)
{
Mpp *mpp = (Mpp *)arg;
@@ -61,8 +63,10 @@ static void *list_wraper_frame(void *arg)
}
Mpp::Mpp(MppCtx ctx = NULL)
: mPackets(NULL),
mFrames(NULL),
: mPktIn(NULL),
mPktOut(NULL),
mFrmIn(NULL),
mFrmOut(NULL),
mPacketPutCount(0),
mPacketGetCount(0),
mFramePutCount(0),
@@ -85,7 +89,7 @@ Mpp::Mpp(MppCtx ctx = NULL)
mCtx(ctx),
mDec(NULL),
mEnc(NULL),
mEncVersion(0),
mEncAyncIo(0),
mType(MPP_CTX_BUTT),
mCoding(MPP_VIDEO_CodingUnused),
mInitDone(0),
@@ -131,8 +135,8 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding)
switch (mType) {
case MPP_CTX_DEC : {
mPackets = new mpp_list(list_wraper_packet);
mFrames = new mpp_list(list_wraper_frame);
mPktIn = new mpp_list(list_wraper_packet);
mFrmOut = new mpp_list(list_wraper_frame);
if (mInputTimeout == MPP_POLL_BUTT)
mInputTimeout = MPP_POLL_NON_BLOCK;
@@ -171,8 +175,10 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding)
mInitDone = 1;
} break;
case MPP_CTX_ENC : {
mFrames = new mpp_list(NULL);
mPackets = new mpp_list(list_wraper_packet);
mPktIn = new mpp_list(list_wraper_packet);
mPktOut = new mpp_list(list_wraper_packet);
mFrmIn = new mpp_list(NULL);
mFrmOut = new mpp_list(NULL);
if (mInputTimeout == MPP_POLL_BUTT)
mInputTimeout = MPP_POLL_BLOCK;
@@ -200,7 +206,14 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding)
ret = mpp_enc_init_v2(&mEnc, &cfg);
if (ret)
break;
ret = mpp_enc_start_v2(mEnc);
if (mInputTimeout == MPP_POLL_NON_BLOCK) {
mEncAyncIo = 1;
ret = mpp_enc_start_async(mEnc);
} else {
ret = mpp_enc_start_v2(mEnc);
}
if (ret)
break;
mInitDone = 1;
@@ -264,13 +277,21 @@ void Mpp::clear()
mExtraPacket = NULL;
}
if (mPackets) {
delete mPackets;
mPackets = NULL;
if (mPktIn) {
delete mPktIn;
mPktIn = NULL;
}
if (mFrames) {
delete mFrames;
mFrames = NULL;
if (mPktOut) {
delete mPktOut;
mPktOut = NULL;
}
if (mFrmIn) {
delete mFrmIn;
mFrmIn = NULL;
}
if (mFrmOut) {
delete mFrmOut;
mFrmOut = NULL;
}
if (mPacketGroup) {
@@ -421,16 +442,16 @@ MPP_RET Mpp::get_frame(MppFrame *frame)
if (!mInitDone)
return MPP_ERR_INIT;
AutoMutex autoFrameLock(mFrames->mutex());
AutoMutex autoFrameLock(mFrmOut->mutex());
MppFrame first = NULL;
if (0 == mFrames->list_size()) {
if (0 == mFrmOut->list_size()) {
if (mOutputTimeout) {
if (mOutputTimeout < 0) {
/* block wait */
mFrames->wait();
mFrmOut->wait();
} else {
RK_S32 ret = mFrames->wait(mOutputTimeout);
RK_S32 ret = mFrmOut->wait(mOutputTimeout);
if (ret) {
if (ret == ETIMEDOUT)
return MPP_ERR_TIMEOUT;
@@ -444,16 +465,16 @@ MPP_RET Mpp::get_frame(MppFrame *frame)
}
}
if (mFrames->list_size()) {
mFrames->del_at_head(&first, sizeof(frame));
if (mFrmOut->list_size()) {
mFrmOut->del_at_head(&first, sizeof(frame));
mFrameGetCount++;
notify(MPP_OUTPUT_DEQUEUE);
if (mMultiFrame) {
MppFrame prev = first;
MppFrame next = NULL;
while (mFrames->list_size()) {
mFrames->del_at_head(&next, sizeof(frame));
while (mFrmOut->list_size()) {
mFrmOut->del_at_head(&next, sizeof(frame));
mFrameGetCount++;
notify(MPP_OUTPUT_DEQUEUE);
mpp_frame_set_next(prev, next);
@@ -467,8 +488,8 @@ MPP_RET Mpp::get_frame(MppFrame *frame)
// There is no way to wake up parser thread to continue decoding.
// The put_packet only signal sem on may be it better to use sem on info
// change too.
AutoMutex autoPacketLock(mPackets->mutex());
if (mPackets->list_size())
AutoMutex autoPacketLock(mPktIn->mutex());
if (mPktIn->list_size())
notify(MPP_INPUT_ENQUEUE);
}
@@ -485,6 +506,9 @@ MPP_RET Mpp::put_frame(MppFrame frame)
if (!mInitDone)
return MPP_ERR_INIT;
if (mEncAyncIo)
return put_frame_async(frame);
MPP_RET ret = MPP_NOK;
MppStopwatch stopwatch = NULL;
@@ -586,6 +610,9 @@ MPP_RET Mpp::get_packet(MppPacket *packet)
if (!mInitDone)
return MPP_ERR_INIT;
if (mEncAyncIo)
return get_packet_async(packet);
MPP_RET ret = MPP_OK;
MppTask task = NULL;
@@ -626,6 +653,65 @@ RET:
return ret;
}
MPP_RET Mpp::put_frame_async(MppFrame frame)
{
if (mFrmIn) {
AutoMutex autoLock(mFrmIn->mutex());
if (mFrmIn->list_size())
return MPP_NOK;
mFrmIn->add_at_tail(&frame, sizeof(frame));
mFramePutCount++;
notify(MPP_INPUT_ENQUEUE);
}
return MPP_OK;
}
MPP_RET Mpp::get_packet_async(MppPacket *packet)
{
MppPacket pkt = NULL;
AutoMutex autoPacketLock(mPktOut->mutex());
if (0 == mPktOut->list_size()) {
if (mOutputTimeout) {
if (mOutputTimeout < 0) {
/* block wait */
mPktOut->wait();
} else {
RK_S32 ret = mPktOut->wait(mOutputTimeout);
if (ret) {
if (ret == ETIMEDOUT)
return MPP_ERR_TIMEOUT;
else
return MPP_NOK;
}
}
} else {
/* NOTE: in non-block mode the sleep is to avoid user's dead loop */
msleep(1);
}
}
if (mPktOut->list_size()) {
mPktOut->del_at_head(&pkt, sizeof(pkt));
mPacketGetCount++;
notify(MPP_OUTPUT_DEQUEUE);
} else {
AutoMutex autoFrameLock(mFrmIn->mutex());
if (mFrmIn->list_size())
notify(MPP_INPUT_ENQUEUE);
}
*packet = pkt;
return MPP_OK;
}
MPP_RET Mpp::poll(MppPortType type, MppPollType timeout)
{
if (!mInitDone)
@@ -783,10 +869,10 @@ MPP_RET Mpp::reset()
* To avoid this case happen we need to save it on reset beginning
* then restore it on reset end.
*/
mPackets->lock();
while (mPackets->list_size()) {
mPktIn->lock();
while (mPktIn->list_size()) {
MppPacket pkt = NULL;
mPackets->del_at_head(&pkt, sizeof(pkt));
mPktIn->del_at_head(&pkt, sizeof(pkt));
mPacketGetCount++;
RK_U32 flags = mpp_packet_get_flag(pkt);
@@ -799,24 +885,24 @@ MPP_RET Mpp::reset()
mpp_packet_deinit(&pkt);
}
}
mPackets->flush();
mPackets->unlock();
mPktIn->flush();
mPktIn->unlock();
mpp_dec_reset(mDec);
mFrames->lock();
mFrames->flush();
mFrames->unlock();
mFrmOut->lock();
mFrmOut->flush();
mFrmOut->unlock();
} else {
mFrames->lock();
mFrames->flush();
mFrames->unlock();
mFrmOut->lock();
mFrmOut->flush();
mFrmOut->unlock();
mpp_enc_reset_v2(mEnc);
mPackets->lock();
mPackets->flush();
mPackets->unlock();
mPktIn->lock();
mPktIn->flush();
mPktIn->unlock();
}
return MPP_OK;
@@ -967,8 +1053,8 @@ MPP_RET Mpp::control_dec(MpiCmd cmd, MppParam param)
ret = mpp_dec_set_cfg_by_cmd(&mDecInitcfg, cmd, param);
} break;
case MPP_DEC_GET_STREAM_COUNT: {
AutoMutex autoLock(mPackets->mutex());
*((RK_S32 *)param) = mPackets->list_size();
AutoMutex autoLock(mPktIn->mutex());
*((RK_S32 *)param) = mPktIn->list_size();
ret = MPP_OK;
} break;
case MPP_DEC_GET_VPUMEM_USED_COUNT :

View File

@@ -74,7 +74,7 @@ typedef struct MppDecVprocCtxImpl_t {
static void dec_vproc_put_frame(Mpp *mpp, MppFrame frame, MppBuffer buf, RK_S64 pts, RK_U32 err)
{
mpp_list *list = mpp->mFrames;
mpp_list *list = mpp->mFrmOut;
MppFrame out = NULL;
MppFrameImpl *impl = NULL;