diff --git a/mpp/codec/inc/mpp_enc.h b/mpp/codec/inc/mpp_enc.h index d1f8eeee..07eb3f14 100644 --- a/mpp/codec/inc/mpp_enc.h +++ b/mpp/codec/inc/mpp_enc.h @@ -165,6 +165,7 @@ MPP_RET mpp_enc_init_v2(MppEnc *ctx, MppEncInitCfg *cfg); MPP_RET mpp_enc_deinit_v2(MppEnc ctx); MPP_RET mpp_enc_start_v2(MppEnc ctx); +MPP_RET mpp_enc_start_async(MppEnc ctx); MPP_RET mpp_enc_stop_v2(MppEnc ctx); MPP_RET mpp_enc_control_v2(MppEnc ctx, MpiCmd cmd, void *param); diff --git a/mpp/codec/inc/mpp_enc_impl.h b/mpp/codec/inc/mpp_enc_impl.h index 6783537d..247ef353 100644 --- a/mpp/codec/inc/mpp_enc_impl.h +++ b/mpp/codec/inc/mpp_enc_impl.h @@ -138,6 +138,7 @@ extern "C" { #endif void *mpp_enc_thread(void *data); +void *mpp_enc_async_thread(void *data); #ifdef __cplusplus } diff --git a/mpp/codec/mpp_dec.cpp b/mpp/codec/mpp_dec.cpp index e52c53cc..104d577e 100644 --- a/mpp/codec/mpp_dec.cpp +++ b/mpp/codec/mpp_dec.cpp @@ -631,7 +631,7 @@ static void mpp_dec_put_frame(Mpp *mpp, RK_S32 index, HalDecTaskFlag flags) dec_vproc_signal(dec->vproc); } else { // direct output -> copy a new MppFrame and output - mpp_list *list = mpp->mFrames; + mpp_list *list = mpp->mFrmOut; MppFrame out = NULL; mpp_frame_init(&out); @@ -954,8 +954,8 @@ static MPP_RET try_proc_dec_task(Mpp *mpp, DecTask *task) dec_dbg_detail("detail: %p check prev task pass\n", dec); /* too many frame delay in dispaly queue */ - if (mpp->mFrames) { - task->wait.dis_que_full = (mpp->mFrames->list_size() > 4) ? 1 : 0; + if (mpp->mFrmOut) { + task->wait.dis_que_full = (mpp->mFrmOut->list_size() > 4) ? 1 : 0; if (task->wait.dis_que_full) return MPP_ERR_DISPLAY_FULL; } diff --git a/mpp/codec/mpp_enc_impl.cpp b/mpp/codec/mpp_enc_impl.cpp index 5f3d994d..0c7eedf9 100644 --- a/mpp/codec/mpp_enc_impl.cpp +++ b/mpp/codec/mpp_enc_impl.cpp @@ -2097,3 +2097,717 @@ void *mpp_enc_thread(void *data) return NULL; } + +/* encoder internal work flow */ +typedef union EncAsyncStatus_u { + RK_U32 val; + struct { + RK_U32 task_hnd_rdy : 1; + RK_U32 task_in_rdy : 1; + RK_U32 task_out_rdy : 1; + + RK_U32 frm_pkt_rdy : 1; + + RK_U32 hal_task_reset_rdy : 1; // reset hal task to start + RK_U32 rc_check_frm_drop : 1; // rc stage + RK_U32 pkt_buf_rdy : 1; // prepare pkt buf + + RK_U32 enc_start : 1; // enc stage + RK_U32 refs_force_update : 1; // enc stage + RK_U32 low_delay_again : 1; // enc stage low delay output again + + RK_U32 enc_backup : 1; // enc stage + RK_U32 enc_restore : 1; // reenc flow start point + RK_U32 enc_proc_dpb : 1; // enc stage + RK_U32 rc_frm_start : 1; // rc stage + RK_U32 check_type_reenc : 1; // flow checkpoint if reenc -> enc_restore + RK_U32 enc_proc_hal : 1; // enc stage + RK_U32 hal_get_task : 1; // hal stage + RK_U32 rc_hal_start : 1; // rc stage + RK_U32 hal_gen_reg : 1; // hal stage + RK_U32 hal_start : 1; // hal stage + RK_U32 hal_wait : 1; // hal stage NOTE: special in low delay mode + RK_U32 rc_hal_end : 1; // rc stage + RK_U32 hal_ret_task : 1; // hal stage + RK_U32 enc_update_hal : 1; // enc stage + RK_U32 rc_frm_end : 1; // rc stage + RK_U32 check_rc_reenc : 1; // flow checkpoint if reenc -> enc_restore + RK_U32 enc_done : 1; // done stage + RK_U32 slice_out_done : 1; + }; +} EncAsyncStatus; + +typedef union EncAsyncWait_u { + RK_U32 val; + struct { + RK_U32 enc_frm_in : 1; // 0x0001 MPP_ENC_NOTIFY_FRAME_ENQUEUE + RK_U32 reserv0002 : 1; // 0x0002 + RK_U32 reserv0004 : 1; // 0x0004 + RK_U32 enc_pkt_out : 1; // 0x0008 MPP_ENC_NOTIFY_PACKET_ENQUEUE + + RK_U32 task_hnd : 1; // 0x0010 + RK_U32 reserv0020 : 1; // 0x0020 + RK_U32 reserv0040 : 1; // 0x0040 + RK_U32 reserv0080 : 1; // 0x0080 + + RK_U32 reserv0100 : 1; // 0x0100 + RK_U32 reserv0200 : 1; // 0x0200 + RK_U32 reserv0400 : 1; // 0x0400 + RK_U32 reserv0800 : 1; // 0x0800 + + RK_U32 reserv1000 : 1; // 0x1000 + RK_U32 reserv2000 : 1; // 0x2000 + RK_U32 reserv4000 : 1; // 0x4000 + RK_U32 reserv8000 : 1; // 0x8000 + }; +} EncAsyncWait; + +typedef struct EncAsyncTaskInfo_t { + RK_S32 seq_idx; + EncAsyncStatus status; + RK_S64 pts; + + HalEncTask task; + EncRcTask rc; + MppEncRefFrmUsrCfg usr; +} EncAsyncTaskInfo; + +typedef struct EncAsyncTask_t { + HalTaskGroup tasks; + HalTaskHnd hnd; + + EncAsyncTaskInfo info; +} EncAsyncTask; + +static void reset_async_task(EncAsyncTask *task) +{ + memset(&task->info, 0, sizeof(task->info)); + task->info.task.rc_task = &task->info.rc; + task->info.task.frm_cfg = &task->info.usr; +} + +static void terminate_async_task(MppEncImpl *enc, EncAsyncTask *async) +{ + HalEncTask *hal_task = &async->info.task; + EncFrmStatus *frm = &enc->rc_task.frm; + Mpp *mpp = (Mpp *)enc->mpp; + + mpp_stopwatch_record(hal_task->stopwatch, "encode task done"); + + if (hal_task->packet) { + MppPacket pkt = hal_task->packet; + + /* setup output packet and meta data */ + mpp_packet_set_length(pkt, hal_task->length); + + /* + * First return output packet. + * Then enqueue task back to input port. + * Final user will release the mpp_frame they had input. + */ + enc_dbg_detail("task %d enqueue packet pts %lld\n", frm->seq_idx, enc->task_pts); + + if (mpp->mPktOut) { + mpp_list *pkt_out = mpp->mPktOut; + + if (enc->frame) { + MppMeta meta = mpp_packet_get_meta(pkt); + MppStopwatch stopwatch = mpp_frame_get_stopwatch(enc->frame); + + mpp_stopwatch_record(stopwatch, "encode task terminate"); + + mpp_meta_set_frame(meta, KEY_INPUT_FRAME, enc->frame); + enc->frame = NULL; + } + + AutoMutex autolock(pkt_out->mutex()); + + pkt_out->add_at_tail(&pkt, sizeof(pkt)); + mpp->mPacketPutCount++; + pkt_out->signal(); + mpp_assert(pkt); + + enc_dbg_detail("packet out ready\n"); + } + } + + reset_async_task(async); +} + +static MPP_RET check_async_frm_pkt(EncAsyncTask *async) +{ + HalEncTask *hal_task = &async->info.task; + MppPacket packet = hal_task->packet; + MppFrame frame = hal_task->frame; + + hal_task->input = NULL; + hal_task->output = NULL; + + if (packet) + hal_task->output = mpp_packet_get_buffer(packet); + else { + mpp_packet_new(&packet); + hal_task->packet = packet; + } + + if (frame) { + RK_S64 pts = mpp_frame_get_pts(frame); + MppBuffer frm_buf = mpp_frame_get_buffer(frame); + + hal_task->input = frm_buf; + + mpp_packet_set_pts(packet, pts); + + if (mpp_frame_get_eos(frame)) + mpp_packet_set_eos(packet); + else + mpp_packet_clr_eos(packet); + } + + return (NULL == frame || NULL == hal_task->input) ? MPP_NOK : MPP_OK; +} + +static MPP_RET check_async_pkt_buf(MppEncImpl *enc, EncAsyncTask *async) +{ + HalEncTask *hal_task = &async->info.task; + + if (NULL == hal_task->output) { + /* NOTE: set buffer w * h * 1.5 to avoid buffer overflow */ + Mpp *mpp = (Mpp *)enc->mpp; + MppEncPrepCfg *prep = &enc->cfg.prep; + RK_U32 width = MPP_ALIGN(prep->width, 16); + RK_U32 height = MPP_ALIGN(prep->height, 16); + RK_U32 size = (enc->coding == MPP_VIDEO_CodingMJPEG) ? + (width * height * 3 / 2) : (width * height); + MppPacketImpl *pkt = (MppPacketImpl *)hal_task->packet; + MppBuffer buffer = NULL; + + mpp_assert(size); + mpp_buffer_get(mpp->mPacketGroup, &buffer, size); + mpp_assert(buffer); + enc->pkt_buf = buffer; + pkt->data = mpp_buffer_get_ptr(buffer); + pkt->pos = pkt->data; + pkt->size = mpp_buffer_get_size(buffer); + pkt->length = 0; + pkt->buffer = buffer; + hal_task->output = buffer; + + enc_dbg_detail("create output pkt %p buf %p\n", hal_task->packet, buffer); + } else { + enc_dbg_detail("output to pkt %p buf %p pos %p length %d\n", + hal_task->packet, hal_task->output, + mpp_packet_get_pos(hal_task->packet), + mpp_packet_get_length(hal_task->packet)); + } + + return MPP_OK; +} + +static MPP_RET try_get_async_task(MppEncImpl *enc, EncAsyncTask *async, EncAsyncWait *wait) +{ + Mpp *mpp = (Mpp *)enc->mpp; + EncRcTask *rc_task = &enc->rc_task; + EncFrmStatus *frm = &rc_task->frm; + MppEncHeaderStatus *hdr_status = &enc->hdr_status; + HalEncTask *hal_task = &async->info.task; + EncAsyncStatus *status = &async->info.status; + MppStopwatch stopwatch = NULL; + MppPacket packet = hal_task->packet; + MppFrame frame = hal_task->frame; + RK_U32 seq_idx = 0; + MPP_RET ret = MPP_OK; + + if (NULL == async->hnd) { + hal_task_get_hnd(async->tasks, TASK_IDLE, &async->hnd); + if (async->hnd) { + status->task_hnd_rdy = 1; + wait->task_hnd = 0; + enc_dbg_detail("get hnd success\n"); + reset_async_task(async); + } else { + wait->task_hnd = 1; + enc_dbg_detail("get hnd failed\n"); + return MPP_NOK; + } + } + + if (NULL == frame) { + if (mpp->mFrmIn) { + mpp_list *frm_in = mpp->mFrmIn; + AutoMutex autolock(frm_in->mutex()); + + if (frm_in->list_size()) { + frm_in->del_at_head(&frame, sizeof(frame)); + mpp->mFrameGetCount++; + + mpp_assert(frame); + + status->task_in_rdy = 1; + wait->enc_frm_in = 0; + + enc_dbg_detail("get input frame success\n"); + + stopwatch = mpp_frame_get_stopwatch(frame); + mpp_stopwatch_record(stopwatch, "encode task start"); + + hal_task->frame = frame; + } + } + + if (NULL == frame) { + enc_dbg_detail("get input frame failed\n"); + wait->enc_frm_in = 1; + return MPP_NOK; + } + } + + if (NULL == packet) { + if (mpp_frame_has_meta(frame)) { + MppMeta meta = mpp_frame_get_meta(frame); + + mpp_meta_get_packet(meta, KEY_OUTPUT_PACKET, &packet); + } + + if (packet) { + status->task_out_rdy = 1; + wait->enc_pkt_out = 0; + + enc_dbg_detail("get output packet success\n"); + + hal_task->packet = packet; + } + } + + mpp_assert(frame); + + stopwatch = mpp_frame_get_stopwatch(frame); + + /* + * 6. get frame and packet for one task + * If there is no input frame just return empty packet task + */ + if (!status->frm_pkt_rdy) { + if (check_async_frm_pkt(async)) { + mpp_stopwatch_record(stopwatch, "invalid on check frm pkt"); + reset_hal_enc_task(hal_task); + ret = MPP_NOK; + goto TASK_DONE; + } + + status->frm_pkt_rdy = 1; + hal_task->stopwatch = stopwatch; + enc_dbg_detail("task frame packet ready\n"); + + async->info.seq_idx = enc->task_idx++; + async->info.pts = mpp_frame_get_pts(hal_task->frame); + + rc_task->frame = async->info.task.frame; + enc_dbg_detail("task seq idx %d start\n", seq_idx); + } + + seq_idx = async->info.seq_idx; + + // 9. check frame drop by frame rate conversion + if (!status->rc_check_frm_drop) { + ENC_RUN_FUNC2(rc_frm_check_drop, enc->rc_ctx, rc_task, enc->mpp, ret); + status->rc_check_frm_drop = 1; + enc_dbg_detail("task %d drop %d\n", seq_idx, frm->drop); + + // when the frame should be dropped just return empty packet + if (frm->drop) { + mpp_stopwatch_record(stopwatch, "invalid on frame rate drop"); + hal_task->valid = 0; + hal_task->length = 0; + ret = MPP_NOK; + goto TASK_DONE; + } + hal_task->valid = 1; + } + + // start encoder task process here + mpp_assert(hal_task->valid); + + // 10. check and create packet for output + if (!status->pkt_buf_rdy) { + check_async_pkt_buf(enc, async); + status->pkt_buf_rdy = 1; + + enc_dbg_detail("task %d check pkt buffer success\n", seq_idx); + } + mpp_assert(hal_task->packet); + mpp_assert(hal_task->output); + + // 11. check hal info update + if (!enc->hal_info_updated) { + update_enc_hal_info(enc); + enc->hal_info_updated = 1; + + enc_dbg_detail("task %d update enc hal info success\n", seq_idx); + } + + // 12. generate header before hardware stream + if (!hdr_status->ready) { + /* config cpb before generating header */ + enc_impl_gen_hdr(enc->impl, enc->hdr_pkt); + enc->hdr_len = mpp_packet_get_length(enc->hdr_pkt); + hdr_status->ready = 1; + + enc_dbg_detail("task %d update header length %d\n", + seq_idx, enc->hdr_len); + + mpp_packet_append(hal_task->packet, enc->hdr_pkt); + hal_task->header_length = enc->hdr_len; + hal_task->length += enc->hdr_len; + hdr_status->added_by_change = 1; + } + + check_hal_task_pkt_len(hal_task, "gen_hdr and adding"); + + // 13. check frm_meta data force key in input frame and start one frame + if (!status->enc_start) { + enc_dbg_detail("task %d enc start\n", seq_idx); + ENC_RUN_FUNC2(enc_impl_start, enc->impl, hal_task, enc->mpp, ret); + status->enc_start = 1; + } + + // 14. setup user_cfg to dpb + if (!status->refs_force_update) { + MppEncRefFrmUsrCfg *frm_cfg = &enc->frm_cfg; + + if (frm_cfg->force_flag) { + mpp_enc_refs_set_usr_cfg(enc->refs, frm_cfg); + frm_cfg->force_flag = 0; + } + + enc_dbg_detail("task %d refs force update success\n", seq_idx); + status->refs_force_update = 1; + } + + // 15. backup dpb + if (!status->enc_backup) { + mpp_enc_refs_stash(enc->refs); + enc_dbg_detail("task %d refs stash success\n", seq_idx); + status->enc_backup = 1; + } + +TASK_DONE: + if (ret) { + enc_dbg_detail("task %d terminate\n", seq_idx); + terminate_async_task(enc, async); + } + + return ret; +} + +static MPP_RET proc_async_task(MppEncImpl *enc, EncAsyncTask *async) +{ + Mpp *mpp = (Mpp*)enc->mpp; + EncImpl impl = enc->impl; + MppEncHal hal = enc->enc_hal; + MppEncHeaderStatus *hdr_status = &enc->hdr_status; + EncAsyncStatus *status = &async->info.status; + HalEncTask *hal_task = &async->info.task; + EncRcTask *rc_task = hal_task->rc_task; + EncCpbStatus *cpb = &rc_task->cpb; + EncFrmStatus *frm = &rc_task->frm; + MppPacket packet = hal_task->packet; + RK_U32 seq_idx = async->info.seq_idx; + MPP_RET ret = MPP_OK; + + enc_dbg_detail("task %d enc proc dpb\n", seq_idx); + mpp_enc_refs_get_cpb(enc->refs, cpb); + + enc_dbg_frm_status("frm %d start ***********************************\n", seq_idx); + ENC_RUN_FUNC2(enc_impl_proc_dpb, impl, hal_task, mpp, ret); + + enc_dbg_detail("task %d rc frame start\n", frm->seq_idx); + ENC_RUN_FUNC2(rc_frm_start, enc->rc_ctx, rc_task, mpp, ret); + + // 16. generate header before hardware stream + if (enc->hdr_mode == MPP_ENC_HEADER_MODE_EACH_IDR && + frm->is_intra && + !hdr_status->added_by_change && + !hdr_status->added_by_ctrl && + !hdr_status->added_by_mode) { + enc_dbg_detail("task %d IDR header length %d\n", + frm->seq_idx, enc->hdr_len); + + mpp_packet_append(packet, enc->hdr_pkt); + + hal_task->header_length = enc->hdr_len; + hal_task->length += enc->hdr_len; + hdr_status->added_by_mode = 1; + } + + // check for header adding + check_hal_task_pkt_len(hal_task, "header adding"); + + /* 17. Add all prefix info before encoding */ + if (frm->is_idr && enc->sei_mode >= MPP_ENC_SEI_MODE_ONE_SEQ) { + RK_S32 length = 0; + + enc_impl_add_prefix(impl, packet, &length, uuid_version, + enc->version_info, enc->version_length); + + hal_task->sei_length += length; + hal_task->length += length; + + length = 0; + enc_impl_add_prefix(impl, packet, &length, uuid_rc_cfg, + enc->rc_cfg_info, enc->rc_cfg_length); + + hal_task->sei_length += length; + hal_task->length += length; + } + + if (mpp_frame_has_meta(hal_task->frame)) { + update_user_datas(impl, packet, hal_task->frame, hal_task); + } + + // check for user data adding + check_hal_task_pkt_len(hal_task, "user data adding"); + + enc_dbg_detail("task %d enc proc hal\n", frm->seq_idx); + ENC_RUN_FUNC2(enc_impl_proc_hal, impl, hal_task, mpp, ret); + + enc_dbg_detail("task %d hal get task\n", frm->seq_idx); + ENC_RUN_FUNC2(mpp_enc_hal_get_task, hal, hal_task, mpp, ret); + + enc_dbg_detail("task %d rc hal start\n", frm->seq_idx); + ENC_RUN_FUNC2(rc_hal_start, enc->rc_ctx, rc_task, mpp, ret); + + enc_dbg_detail("task %d hal generate reg\n", frm->seq_idx); + ENC_RUN_FUNC2(mpp_enc_hal_gen_regs, hal, hal_task, mpp, ret); + + mpp_stopwatch_record(hal_task->stopwatch, "encode hal start"); + enc_dbg_detail("task %d hal start\n", frm->seq_idx); + ENC_RUN_FUNC2(mpp_enc_hal_start, hal, hal_task, mpp, ret); + + status->enc_done = 0; + hal_task_hnd_set_info(async->hnd, &async->info); + hal_task_hnd_set_status(async->hnd, TASK_PROCESSING); + enc_dbg_detail("task %d on processing ret %d\n", frm->seq_idx, ret); + + reset_async_task(async); + async->hnd = NULL; + +TASK_DONE: + return ret; +} + +static MPP_RET check_enc_async_wait(MppEncImpl *enc, EncAsyncWait *wait) +{ + MPP_RET ret = MPP_OK; + RK_U32 notify = enc->notify_flag; + RK_U32 last_wait = enc->status_flag; + RK_U32 curr_wait = wait->val; + RK_U32 wait_chg = last_wait & (~curr_wait); + + do { + if (enc->reset_flag) + break; + + // NOTE: User control should always be processed + if (notify & MPP_ENC_CONTROL) + break; + + // NOTE: When condition is not fulfilled check nofify flag again + if (!curr_wait || (curr_wait & notify)) + break; + + ret = MPP_NOK; + } while (0); + + enc_dbg_status("%p %08x -> %08x [%08x] notify %08x -> %s\n", enc, + last_wait, curr_wait, wait_chg, notify, (ret) ? ("wait") : ("work")); + + enc->status_flag = wait->val; + enc->notify_flag = 0; + + if (ret) { + enc->wait_count++; + } else { + enc->work_count++; + } + + return ret; +} + +static MPP_RET enc_async_wait_task(MppEncImpl *enc, EncAsyncTaskInfo *info) +{ + Mpp *mpp = (Mpp*)enc->mpp; + MppEncHal hal = enc->enc_hal; + HalEncTask *hal_task = &info->task; + EncRcTask *rc_task = hal_task->rc_task; + EncFrmStatus *frm = &info->rc.frm; + MppPacket pkt = hal_task->packet; + MppMeta meta = mpp_packet_get_meta(pkt); + MPP_RET ret = MPP_OK; + + enc_dbg_detail("task %d hal wait\n", frm->seq_idx); + ENC_RUN_FUNC2(mpp_enc_hal_wait, hal, hal_task, mpp, ret); + + mpp_stopwatch_record(hal_task->stopwatch, "encode hal finish"); + + enc_dbg_detail("task %d rc hal end\n", frm->seq_idx); + ENC_RUN_FUNC2(rc_hal_end, enc->rc_ctx, rc_task, mpp, ret); + + enc_dbg_detail("task %d hal ret task\n", frm->seq_idx); + ENC_RUN_FUNC2(mpp_enc_hal_ret_task, hal, hal_task, mpp, ret); + +TASK_DONE: + /* setup output packet and meta data */ + mpp_packet_set_length(pkt, hal_task->length); + + check_hal_task_pkt_len(hal_task, "hw finish"); + + mpp_meta_set_s32(meta, KEY_OUTPUT_INTRA, frm->is_intra); + if (rc_task->info.quality_real) + mpp_meta_set_s32(meta, KEY_ENC_AVERAGE_QP, rc_task->info.quality_real); + + mpp_meta_set_frame(meta, KEY_INPUT_FRAME, hal_task->frame); + + enc_dbg_detail("task %d output packet pts %lld\n", info->seq_idx, info->pts); + + if (mpp->mPktOut) { + mpp_list *pkt_out = mpp->mPktOut; + + AutoMutex autoLock(pkt_out->mutex()); + + pkt_out->add_at_tail(&pkt, sizeof(pkt)); + mpp->mPacketPutCount++; + pkt_out->signal(); + } + + return ret; +} + +void *mpp_enc_async_thread(void *data) +{ + Mpp *mpp = (Mpp*)data; + MppEncImpl *enc = (MppEncImpl *)mpp->mEnc; + MppThread *thd_enc = enc->thread_enc; + HalTaskGroup tasks = NULL; + EncAsyncTask task; + EncAsyncWait wait; + HalTaskHnd hnd = NULL; + MPP_RET ret = MPP_OK; + + enc_dbg_func("thread start\n"); + + reset_async_task(&task); + task.hnd = NULL; + wait.val = 0; + + /* init two task for */ + ret = hal_task_group_init(&tasks, 2, sizeof(EncAsyncTaskInfo)); + if (ret) { + mpp_err_f("hal_task_group_init failed ret %d\n", ret); + goto DONE;; + } + task.tasks = tasks; + + while (1) { + { + AutoMutex autolock(thd_enc->mutex()); + if (MPP_THREAD_RUNNING != thd_enc->get_status()) + break; + + if (check_enc_async_wait(enc, &wait)) { + enc_dbg_detail("wait start\n"); + thd_enc->wait(); + enc_dbg_detail("wait done\n"); + } + } + + // When encoder is not on encoding process external config and reset + // 1. process user control and reset flag + if (enc->cmd_send != enc->cmd_recv || enc->reset_flag) { + enc_dbg_detail("ctrl proc %d cmd %08x\n", enc->cmd_recv, enc->cmd); + + // wait all tasks done + while (MPP_OK == hal_task_get_hnd(tasks, TASK_PROCESSING, &hnd)) { + EncAsyncTaskInfo info; + + hal_task_hnd_get_info(hnd, &info); + + mpp_assert(!info.status.enc_done); + enc_async_wait_task(enc, &info); + hal_task_hnd_set_status(hnd, TASK_IDLE); + wait.task_hnd = 0; + } + + if (enc->cmd_send != enc->cmd_recv) { + sem_wait(&enc->cmd_start); + ret = mpp_enc_proc_cfg(enc, enc->cmd, enc->param); + if (ret) + *enc->cmd_ret = ret; + enc->cmd_recv++; + enc_dbg_detail("ctrl proc %d done send %d\n", enc->cmd_recv, + enc->cmd_send); + mpp_assert(enc->cmd_send == enc->cmd_send); + enc->param = NULL; + enc->cmd = (MpiCmd)0; + sem_post(&enc->cmd_done); + + // async cfg update process for hal + // mainly for buffer prepare + mpp_enc_hal_prepare(enc->enc_hal); + + /* NOTE: here will clear change flag of rc and prep cfg */ + mpp_enc_proc_rc_update(enc); + continue; + } + + if (enc->reset_flag) { + enc_dbg_detail("thread reset start\n"); + { + AutoMutex autolock(thd_enc->mutex()); + enc->status_flag = 0; + } + + AutoMutex autolock(thd_enc->mutex(THREAD_CONTROL)); + enc->reset_flag = 0; + sem_post(&enc->enc_reset); + enc_dbg_detail("thread reset done\n"); + continue; + } + } + + // 2. try get a task to encode + ret = try_get_async_task(enc, &task, &wait); + enc_dbg_detail("try_get_async_task ret %d\n", ret); + if (ret) { + hal_task_get_hnd(tasks, TASK_PROCESSING, &hnd); + if (hnd) { + EncAsyncTaskInfo info; + + hal_task_hnd_get_info(hnd, &info); + + mpp_assert(!info.status.enc_done); + enc_async_wait_task(enc, &info); + hal_task_hnd_set_status(hnd, TASK_IDLE); + wait.task_hnd = 0; + } + + continue; + } + + mpp_assert(task.info.task.valid); + + proc_async_task(enc, &task); + } + +DONE: + enc_dbg_func("loop done\n"); + + if (tasks) { + ret = hal_task_group_deinit(tasks); + tasks = NULL; + } + + enc_dbg_func("thread finish\n"); + + return NULL; +} diff --git a/mpp/codec/mpp_enc_v2.cpp b/mpp/codec/mpp_enc_v2.cpp index 482c3171..00a988e8 100644 --- a/mpp/codec/mpp_enc_v2.cpp +++ b/mpp/codec/mpp_enc_v2.cpp @@ -217,6 +217,24 @@ MPP_RET mpp_enc_start_v2(MppEnc ctx) return MPP_OK; } +MPP_RET mpp_enc_start_async(MppEnc ctx) +{ + MppEncImpl *enc = (MppEncImpl *)ctx; + char name[16]; + + enc_dbg_func("%p in\n", enc); + + snprintf(name, sizeof(name) - 1, "mpp_%se_%d", + strof_coding_type(enc->coding), getpid()); + + enc->thread_enc = new MppThread(mpp_enc_async_thread, enc->mpp, name); + enc->thread_enc->start(); + + enc_dbg_func("%p out\n", enc); + + return MPP_OK; +} + MPP_RET mpp_enc_stop_v2(MppEnc ctx) { MPP_RET ret = MPP_OK; diff --git a/mpp/inc/mpp.h b/mpp/inc/mpp.h index 5261cadb..a24a17e0 100644 --- a/mpp/inc/mpp.h +++ b/mpp/inc/mpp.h @@ -122,8 +122,10 @@ public: MPP_RET notify(RK_U32 flag); MPP_RET notify(MppBufferGroup group); - mpp_list *mPackets; - mpp_list *mFrames; + mpp_list *mPktIn; + mpp_list *mPktOut; + mpp_list *mFrmIn; + mpp_list *mFrmOut; /* counters for debug */ RK_U32 mPacketPutCount; RK_U32 mPacketGetCount; @@ -176,7 +178,7 @@ public: MppDec mDec; MppEnc mEnc; - RK_U32 mEncVersion; + RK_U32 mEncAyncIo; private: void clear(); @@ -208,6 +210,10 @@ private: MPP_RET control_enc(MpiCmd cmd, MppParam param); MPP_RET control_isp(MpiCmd cmd, MppParam param); + /* for special encoder async io mode */ + MPP_RET put_frame_async(MppFrame frame); + MPP_RET get_packet_async(MppPacket *packet); + Mpp(const Mpp &); Mpp &operator=(const Mpp &); }; diff --git a/mpp/mpp.cpp b/mpp/mpp.cpp index 0404d16b..a5998840 100644 --- a/mpp/mpp.cpp +++ b/mpp/mpp.cpp @@ -41,6 +41,8 @@ #define MPP_TEST_FRAME_SIZE SZ_1M #define MPP_TEST_PACKET_SIZE SZ_512K +typedef MPP_RET (*QueueFunc)(Mpp* mpp, MppPortType type, MppTask *task); + static void mpp_notify_by_buffer_group(void *arg, void *group) { Mpp *mpp = (Mpp *)arg; @@ -61,8 +63,10 @@ static void *list_wraper_frame(void *arg) } Mpp::Mpp(MppCtx ctx = NULL) - : mPackets(NULL), - mFrames(NULL), + : mPktIn(NULL), + mPktOut(NULL), + mFrmIn(NULL), + mFrmOut(NULL), mPacketPutCount(0), mPacketGetCount(0), mFramePutCount(0), @@ -85,7 +89,7 @@ Mpp::Mpp(MppCtx ctx = NULL) mCtx(ctx), mDec(NULL), mEnc(NULL), - mEncVersion(0), + mEncAyncIo(0), mType(MPP_CTX_BUTT), mCoding(MPP_VIDEO_CodingUnused), mInitDone(0), @@ -131,8 +135,8 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding) switch (mType) { case MPP_CTX_DEC : { - mPackets = new mpp_list(list_wraper_packet); - mFrames = new mpp_list(list_wraper_frame); + mPktIn = new mpp_list(list_wraper_packet); + mFrmOut = new mpp_list(list_wraper_frame); if (mInputTimeout == MPP_POLL_BUTT) mInputTimeout = MPP_POLL_NON_BLOCK; @@ -171,8 +175,10 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding) mInitDone = 1; } break; case MPP_CTX_ENC : { - mFrames = new mpp_list(NULL); - mPackets = new mpp_list(list_wraper_packet); + mPktIn = new mpp_list(list_wraper_packet); + mPktOut = new mpp_list(list_wraper_packet); + mFrmIn = new mpp_list(NULL); + mFrmOut = new mpp_list(NULL); if (mInputTimeout == MPP_POLL_BUTT) mInputTimeout = MPP_POLL_BLOCK; @@ -200,7 +206,14 @@ MPP_RET Mpp::init(MppCtxType type, MppCodingType coding) ret = mpp_enc_init_v2(&mEnc, &cfg); if (ret) break; - ret = mpp_enc_start_v2(mEnc); + + if (mInputTimeout == MPP_POLL_NON_BLOCK) { + mEncAyncIo = 1; + ret = mpp_enc_start_async(mEnc); + } else { + ret = mpp_enc_start_v2(mEnc); + } + if (ret) break; mInitDone = 1; @@ -264,13 +277,21 @@ void Mpp::clear() mExtraPacket = NULL; } - if (mPackets) { - delete mPackets; - mPackets = NULL; + if (mPktIn) { + delete mPktIn; + mPktIn = NULL; } - if (mFrames) { - delete mFrames; - mFrames = NULL; + if (mPktOut) { + delete mPktOut; + mPktOut = NULL; + } + if (mFrmIn) { + delete mFrmIn; + mFrmIn = NULL; + } + if (mFrmOut) { + delete mFrmOut; + mFrmOut = NULL; } if (mPacketGroup) { @@ -421,16 +442,16 @@ MPP_RET Mpp::get_frame(MppFrame *frame) if (!mInitDone) return MPP_ERR_INIT; - AutoMutex autoFrameLock(mFrames->mutex()); + AutoMutex autoFrameLock(mFrmOut->mutex()); MppFrame first = NULL; - if (0 == mFrames->list_size()) { + if (0 == mFrmOut->list_size()) { if (mOutputTimeout) { if (mOutputTimeout < 0) { /* block wait */ - mFrames->wait(); + mFrmOut->wait(); } else { - RK_S32 ret = mFrames->wait(mOutputTimeout); + RK_S32 ret = mFrmOut->wait(mOutputTimeout); if (ret) { if (ret == ETIMEDOUT) return MPP_ERR_TIMEOUT; @@ -444,16 +465,16 @@ MPP_RET Mpp::get_frame(MppFrame *frame) } } - if (mFrames->list_size()) { - mFrames->del_at_head(&first, sizeof(frame)); + if (mFrmOut->list_size()) { + mFrmOut->del_at_head(&first, sizeof(frame)); mFrameGetCount++; notify(MPP_OUTPUT_DEQUEUE); if (mMultiFrame) { MppFrame prev = first; MppFrame next = NULL; - while (mFrames->list_size()) { - mFrames->del_at_head(&next, sizeof(frame)); + while (mFrmOut->list_size()) { + mFrmOut->del_at_head(&next, sizeof(frame)); mFrameGetCount++; notify(MPP_OUTPUT_DEQUEUE); mpp_frame_set_next(prev, next); @@ -467,8 +488,8 @@ MPP_RET Mpp::get_frame(MppFrame *frame) // There is no way to wake up parser thread to continue decoding. // The put_packet only signal sem on may be it better to use sem on info // change too. - AutoMutex autoPacketLock(mPackets->mutex()); - if (mPackets->list_size()) + AutoMutex autoPacketLock(mPktIn->mutex()); + if (mPktIn->list_size()) notify(MPP_INPUT_ENQUEUE); } @@ -485,6 +506,9 @@ MPP_RET Mpp::put_frame(MppFrame frame) if (!mInitDone) return MPP_ERR_INIT; + if (mEncAyncIo) + return put_frame_async(frame); + MPP_RET ret = MPP_NOK; MppStopwatch stopwatch = NULL; @@ -586,6 +610,9 @@ MPP_RET Mpp::get_packet(MppPacket *packet) if (!mInitDone) return MPP_ERR_INIT; + if (mEncAyncIo) + return get_packet_async(packet); + MPP_RET ret = MPP_OK; MppTask task = NULL; @@ -626,6 +653,65 @@ RET: return ret; } +MPP_RET Mpp::put_frame_async(MppFrame frame) +{ + if (mFrmIn) { + AutoMutex autoLock(mFrmIn->mutex()); + + if (mFrmIn->list_size()) + return MPP_NOK; + + mFrmIn->add_at_tail(&frame, sizeof(frame)); + mFramePutCount++; + + notify(MPP_INPUT_ENQUEUE); + } + + return MPP_OK; +} + +MPP_RET Mpp::get_packet_async(MppPacket *packet) +{ + MppPacket pkt = NULL; + + AutoMutex autoPacketLock(mPktOut->mutex()); + + if (0 == mPktOut->list_size()) { + if (mOutputTimeout) { + if (mOutputTimeout < 0) { + /* block wait */ + mPktOut->wait(); + } else { + RK_S32 ret = mPktOut->wait(mOutputTimeout); + if (ret) { + if (ret == ETIMEDOUT) + return MPP_ERR_TIMEOUT; + else + return MPP_NOK; + } + } + } else { + /* NOTE: in non-block mode the sleep is to avoid user's dead loop */ + msleep(1); + } + } + + if (mPktOut->list_size()) { + mPktOut->del_at_head(&pkt, sizeof(pkt)); + mPacketGetCount++; + notify(MPP_OUTPUT_DEQUEUE); + } else { + AutoMutex autoFrameLock(mFrmIn->mutex()); + + if (mFrmIn->list_size()) + notify(MPP_INPUT_ENQUEUE); + } + + *packet = pkt; + + return MPP_OK; +} + MPP_RET Mpp::poll(MppPortType type, MppPollType timeout) { if (!mInitDone) @@ -783,10 +869,10 @@ MPP_RET Mpp::reset() * To avoid this case happen we need to save it on reset beginning * then restore it on reset end. */ - mPackets->lock(); - while (mPackets->list_size()) { + mPktIn->lock(); + while (mPktIn->list_size()) { MppPacket pkt = NULL; - mPackets->del_at_head(&pkt, sizeof(pkt)); + mPktIn->del_at_head(&pkt, sizeof(pkt)); mPacketGetCount++; RK_U32 flags = mpp_packet_get_flag(pkt); @@ -799,24 +885,24 @@ MPP_RET Mpp::reset() mpp_packet_deinit(&pkt); } } - mPackets->flush(); - mPackets->unlock(); + mPktIn->flush(); + mPktIn->unlock(); mpp_dec_reset(mDec); - mFrames->lock(); - mFrames->flush(); - mFrames->unlock(); + mFrmOut->lock(); + mFrmOut->flush(); + mFrmOut->unlock(); } else { - mFrames->lock(); - mFrames->flush(); - mFrames->unlock(); + mFrmOut->lock(); + mFrmOut->flush(); + mFrmOut->unlock(); mpp_enc_reset_v2(mEnc); - mPackets->lock(); - mPackets->flush(); - mPackets->unlock(); + mPktIn->lock(); + mPktIn->flush(); + mPktIn->unlock(); } return MPP_OK; @@ -967,8 +1053,8 @@ MPP_RET Mpp::control_dec(MpiCmd cmd, MppParam param) ret = mpp_dec_set_cfg_by_cmd(&mDecInitcfg, cmd, param); } break; case MPP_DEC_GET_STREAM_COUNT: { - AutoMutex autoLock(mPackets->mutex()); - *((RK_S32 *)param) = mPackets->list_size(); + AutoMutex autoLock(mPktIn->mutex()); + *((RK_S32 *)param) = mPktIn->list_size(); ret = MPP_OK; } break; case MPP_DEC_GET_VPUMEM_USED_COUNT : diff --git a/mpp/vproc/mpp_dec_vproc.cpp b/mpp/vproc/mpp_dec_vproc.cpp index 56af8ddb..ad305742 100644 --- a/mpp/vproc/mpp_dec_vproc.cpp +++ b/mpp/vproc/mpp_dec_vproc.cpp @@ -74,7 +74,7 @@ typedef struct MppDecVprocCtxImpl_t { static void dec_vproc_put_frame(Mpp *mpp, MppFrame frame, MppBuffer buf, RK_S64 pts, RK_U32 err) { - mpp_list *list = mpp->mFrames; + mpp_list *list = mpp->mFrmOut; MppFrame out = NULL; MppFrameImpl *impl = NULL;