mirror of
https://github.com/nyanmisaka/mpp.git
synced 2025-10-04 08:46:24 +08:00
[mpp_cluster]: Add mpp_cluster module
Change-Id: I33befe847b6215572b470b41afea2398019d568c Signed-off-by: Herman Chen <herman.chen@rock-chips.com>
This commit is contained in:
@@ -8,6 +8,7 @@ add_library(mpp_base STATIC
|
||||
mpp_enc_ref.cpp
|
||||
mpp_enc_cfg.cpp
|
||||
mpp_dec_cfg.cpp
|
||||
mpp_cluster.cpp
|
||||
mpp_buf_slot.cpp
|
||||
mpp_buffer_impl.cpp
|
||||
mpp_buffer.cpp
|
||||
|
50
mpp/base/inc/mpp_cluster.h
Normal file
50
mpp/base/inc/mpp_cluster.h
Normal file
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright 2021 Rockchip Electronics Co. LTD
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef __MPP_CLUSTER_H__
|
||||
#define __MPP_CLUSTER_H__
|
||||
|
||||
#include "mpp_list.h"
|
||||
#include "mpp_thread.h"
|
||||
#include "mpp_dev_defs.h"
|
||||
|
||||
#define MAX_PRIORITY 1
|
||||
|
||||
typedef void* MppNode;
|
||||
|
||||
typedef RK_S32 (*TaskProc)(void *param);
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
MPP_RET mpp_node_init(MppNode *node);
|
||||
MPP_RET mpp_node_deinit(MppNode node);
|
||||
|
||||
MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param);
|
||||
|
||||
MPP_RET mpp_node_attach(MppNode node, MppClientType type);
|
||||
MPP_RET mpp_node_detach(MppNode node);
|
||||
|
||||
#define mpp_node_trigger(node, trigger) mpp_node_trigger_f(__FUNCTION__, node, trigger)
|
||||
|
||||
MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*__MPP_CLUSTER_H__*/
|
746
mpp/base/mpp_cluster.cpp
Normal file
746
mpp/base/mpp_cluster.cpp
Normal file
@@ -0,0 +1,746 @@
|
||||
/*
|
||||
* Copyright 2021 Rockchip Electronics Co. LTD
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#define MODULE_TAG "mpp_cluster"
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "mpp_log.h"
|
||||
#include "mpp_mem.h"
|
||||
#include "mpp_env.h"
|
||||
#include "mpp_time.h"
|
||||
|
||||
#include "mpp_cluster.h"
|
||||
#include "mpp_dev_defs.h"
|
||||
|
||||
#define MPP_CLUSTER_DBG_FLOW (0x00000001)
|
||||
#define MPP_CLUSTER_DBG_LOCK (0x00000002)
|
||||
|
||||
#define cluster_dbg(flag, fmt, ...) _mpp_dbg(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
|
||||
#define cluster_dbg_f(flag, fmt, ...) _mpp_dbg_f(mpp_cluster_debug, flag, fmt, ## __VA_ARGS__)
|
||||
|
||||
#define cluster_dbg_flow(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_FLOW, fmt, ## __VA_ARGS__)
|
||||
#define cluster_dbg_lock(fmt, ...) cluster_dbg(MPP_CLUSTER_DBG_LOCK, fmt, ## __VA_ARGS__)
|
||||
|
||||
RK_U32 mpp_cluster_debug = 0;
|
||||
RK_U32 mpp_cluster_thd_cnt = 1;
|
||||
|
||||
typedef struct MppNodeProc_s MppNodeProc;
|
||||
typedef struct MppNodeTask_s MppNodeTask;
|
||||
typedef struct MppNodeImpl_s MppNodeImpl;
|
||||
|
||||
typedef struct ClusterQueue_s ClusterQueue;
|
||||
typedef struct ClusterWorker_s ClusterWorker;
|
||||
typedef struct MppCluster_s MppCluster;
|
||||
|
||||
#define NODE_VALID (0x00000001)
|
||||
#define NODE_IDLE (0x00000002)
|
||||
#define NODE_SIGNAL (0x00000004)
|
||||
#define NODE_WAIT (0x00000008)
|
||||
#define NODE_RUN (0x00000010)
|
||||
|
||||
#define NODE_ACT_NONE (0x00000000)
|
||||
#define NODE_ACT_IDLE_TO_WAIT (0x00000001)
|
||||
#define NODE_ACT_RUN_TO_SIGNAL (0x00000002)
|
||||
|
||||
typedef enum MppWorkerState_e {
|
||||
WORKER_IDLE,
|
||||
WORKER_RUNNING,
|
||||
|
||||
WORKER_STATE_BUTT,
|
||||
} MppWorkerState;
|
||||
|
||||
struct MppNodeProc_s {
|
||||
TaskProc proc;
|
||||
void *param;
|
||||
|
||||
/* timing statistic */
|
||||
RK_U32 run_count;
|
||||
RK_S64 run_time;
|
||||
};
|
||||
|
||||
struct MppNodeTask_s {
|
||||
struct list_head list_sched;
|
||||
MppNodeImpl *node;
|
||||
const char *node_name;
|
||||
|
||||
/* lock ptr to cluster queue lock */
|
||||
ClusterQueue *queue;
|
||||
|
||||
MppNodeProc *proc;
|
||||
};
|
||||
|
||||
/* MppNode will be embeded in MppCtx */
|
||||
struct MppNodeImpl_s {
|
||||
char name[32];
|
||||
/* list linked to scheduler */
|
||||
RK_S32 node_id;
|
||||
RK_U32 state;
|
||||
|
||||
MppNodeProc work;
|
||||
|
||||
RK_U32 priority;
|
||||
RK_S32 attached;
|
||||
sem_t sem_detach;
|
||||
|
||||
/* for cluster schedule */
|
||||
MppNodeTask task;
|
||||
};
|
||||
|
||||
struct ClusterQueue_s {
|
||||
MppCluster *cluster;
|
||||
|
||||
pthread_mutex_t lock;
|
||||
struct list_head list;
|
||||
RK_S32 count;
|
||||
};
|
||||
|
||||
struct ClusterWorker_s {
|
||||
char name[32];
|
||||
MppCluster *cluster;
|
||||
RK_S32 worker_id;
|
||||
|
||||
MppThread *thd;
|
||||
MppWorkerState state;
|
||||
|
||||
RK_S32 batch_count;
|
||||
RK_S32 work_count;
|
||||
struct list_head list_task;
|
||||
};
|
||||
|
||||
struct MppCluster_s {
|
||||
char name[32];
|
||||
pid_t pid;
|
||||
RK_S32 client_type;
|
||||
RK_S32 node_id;
|
||||
RK_S32 worker_id;
|
||||
|
||||
ClusterQueue queue[MAX_PRIORITY];
|
||||
RK_S32 node_count;
|
||||
|
||||
/* multi-worker info */
|
||||
RK_S32 worker_count;
|
||||
ClusterWorker *worker;
|
||||
MppThreadFunc worker_func;
|
||||
};
|
||||
|
||||
#define mpp_node_task_schedule(task) \
|
||||
mpp_node_task_schedule_f(__FUNCTION__, task)
|
||||
|
||||
#define mpp_node_task_schedule_from(caller, task) \
|
||||
mpp_node_task_schedule_f(caller, task)
|
||||
|
||||
#define cluster_queue_lock(queue) cluster_queue_lock_f(__FUNCTION__, queue)
|
||||
#define cluster_queue_unlock(queue) cluster_queue_unlock_f(__FUNCTION__, queue)
|
||||
|
||||
static MPP_RET cluster_queue_lock_f(const char *caller, ClusterQueue *queue)
|
||||
{
|
||||
MppCluster *cluster = queue->cluster;
|
||||
RK_S32 ret;
|
||||
|
||||
cluster_dbg_lock("%s lock queue at %s start\n", cluster->name, caller);
|
||||
|
||||
ret = pthread_mutex_lock(&queue->lock);
|
||||
|
||||
cluster_dbg_lock("%s lock queue at %s ret %d \n", cluster->name, caller, ret);
|
||||
|
||||
return (ret) ? MPP_NOK : MPP_OK;
|
||||
}
|
||||
|
||||
static MPP_RET cluster_queue_unlock_f(const char *caller, ClusterQueue *queue)
|
||||
{
|
||||
MppCluster *cluster = queue->cluster;
|
||||
RK_S32 ret;
|
||||
|
||||
cluster_dbg_lock("%s unlock queue at %s start\n", cluster->name, caller);
|
||||
|
||||
ret = pthread_mutex_unlock(&queue->lock);
|
||||
|
||||
cluster_dbg_lock("%s unlock queue at %s ret %d \n", cluster->name, caller, ret);
|
||||
|
||||
return (ret) ? MPP_NOK : MPP_OK;
|
||||
}
|
||||
|
||||
void cluster_signal_f(const char *caller, MppCluster *p);
|
||||
|
||||
MPP_RET mpp_cluster_queue_init(ClusterQueue *queue, MppCluster *cluster)
|
||||
{
|
||||
pthread_mutexattr_t attr;
|
||||
|
||||
pthread_mutexattr_init(&attr);
|
||||
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||
pthread_mutex_init(&queue->lock, &attr);
|
||||
pthread_mutexattr_destroy(&attr);
|
||||
|
||||
queue->cluster = cluster;
|
||||
INIT_LIST_HEAD(&queue->list);
|
||||
queue->count = 0;
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_cluster_queue_deinit(ClusterQueue *queue)
|
||||
{
|
||||
mpp_assert(!queue->count);
|
||||
mpp_assert(list_empty(&queue->list));
|
||||
|
||||
pthread_mutex_destroy(&queue->lock);
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_task_attach(MppNodeTask *task, MppNodeImpl *node,
|
||||
ClusterQueue *queue, MppNodeProc *proc)
|
||||
{
|
||||
INIT_LIST_HEAD(&task->list_sched);
|
||||
|
||||
task->node = node;
|
||||
task->node_name = node->name;
|
||||
|
||||
task->queue = queue;
|
||||
task->proc = proc;
|
||||
|
||||
node->state = NODE_VALID | NODE_IDLE;
|
||||
node->attached = 1;
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_task_schedule_f(const char *caller, MppNodeTask *task)
|
||||
{
|
||||
ClusterQueue *queue = task->queue;
|
||||
MppCluster *cluster = queue->cluster;
|
||||
MppNodeImpl *node = task->node;
|
||||
MppNodeProc *proc = task->proc;
|
||||
const char *node_name = task->node_name;
|
||||
RK_U32 new_st;
|
||||
RK_U32 action = NODE_ACT_NONE;
|
||||
bool ret = false;
|
||||
|
||||
cluster_dbg_flow("%s sched from %s before [%d:%d] queue %d\n",
|
||||
node_name, caller, node->state, proc->run_count, queue->count);
|
||||
|
||||
do {
|
||||
RK_U32 old_st = node->state;
|
||||
|
||||
action = NODE_ACT_NONE;
|
||||
new_st = 0;
|
||||
|
||||
if (old_st & NODE_WAIT) {
|
||||
cluster_dbg_flow("%s sched task %x stay wait\n", node_name, old_st);
|
||||
break;
|
||||
}
|
||||
|
||||
if (old_st & NODE_IDLE) {
|
||||
new_st = old_st ^ (NODE_IDLE | NODE_WAIT);
|
||||
cluster_dbg_flow("%s sched task %x -> %x wait\n", node_name, old_st, new_st);
|
||||
action = NODE_ACT_IDLE_TO_WAIT;
|
||||
} else if (old_st & NODE_RUN) {
|
||||
new_st = old_st | NODE_SIGNAL;
|
||||
action = NODE_ACT_RUN_TO_SIGNAL;
|
||||
cluster_dbg_flow("%s sched task %x -> %x signal\n", node_name, old_st, new_st);
|
||||
} else {
|
||||
cluster_dbg_flow("%s sched task %x unknow state %x\n", node_name, old_st);
|
||||
}
|
||||
|
||||
ret = __sync_bool_compare_and_swap(&node->state, old_st, new_st);
|
||||
cluster_dbg_flow("%s sched task %x -> %x cas ret %d act %d\n",
|
||||
node_name, old_st, new_st, ret, action);
|
||||
} while (!ret);
|
||||
|
||||
switch (action) {
|
||||
case NODE_ACT_IDLE_TO_WAIT : {
|
||||
cluster_queue_lock(queue);
|
||||
mpp_assert(list_empty(&task->list_sched));
|
||||
list_add_tail(&task->list_sched, &queue->list);
|
||||
queue->count++;
|
||||
cluster_dbg_flow("%s sched task -> wq %s:%d\n", node_name, cluster->name, queue->count);
|
||||
cluster_queue_unlock(queue);
|
||||
|
||||
cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
|
||||
cluster_signal_f(caller, cluster);
|
||||
} break;
|
||||
case NODE_ACT_RUN_TO_SIGNAL : {
|
||||
cluster_dbg_flow("%s sched signal from %s\n", node_name, caller);
|
||||
cluster_signal_f(caller, cluster);
|
||||
} break;
|
||||
}
|
||||
|
||||
cluster_dbg_flow("%s sched from %s after [%d:%d] queue %d\n",
|
||||
node_name, caller, node->state, proc->run_count, queue->count);
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_task_detach(MppNodeTask *task)
|
||||
{
|
||||
MppNodeImpl *node = task->node;
|
||||
MPP_RET ret = MPP_OK;
|
||||
|
||||
if (node->attached) {
|
||||
const char *node_name = task->node_name;
|
||||
MppNodeProc *proc = task->proc;
|
||||
|
||||
__sync_fetch_and_and(&node->state, ~NODE_VALID);
|
||||
|
||||
mpp_node_task_schedule(task);
|
||||
|
||||
cluster_dbg_flow("%s state %x:%d wait detach start\n",
|
||||
node_name, node->state, proc->run_count);
|
||||
|
||||
sem_wait(&node->sem_detach);
|
||||
mpp_assert(node->attached == 0);
|
||||
|
||||
cluster_dbg_flow("%s state %x:%d wait detach done\n",
|
||||
node_name, node->state, proc->run_count);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_init(MppNode *node)
|
||||
{
|
||||
MppNodeImpl *p = mpp_calloc(MppNodeImpl, 1);
|
||||
if (p)
|
||||
sem_init(&p->sem_detach, 0, 0);
|
||||
|
||||
*node = p;
|
||||
|
||||
return p ? MPP_OK : MPP_NOK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_deinit(MppNode node)
|
||||
{
|
||||
MppNodeImpl *p = (MppNodeImpl *)node;
|
||||
|
||||
if (p) {
|
||||
if (p->attached)
|
||||
mpp_node_task_detach(&p->task);
|
||||
|
||||
mpp_assert(p->attached == 0);
|
||||
|
||||
sem_destroy(&p->sem_detach);
|
||||
|
||||
mpp_free(p);
|
||||
}
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_set_func(MppNode node, TaskProc proc, void *param)
|
||||
{
|
||||
MppNodeImpl *p = (MppNodeImpl *)node;
|
||||
if (!p)
|
||||
return MPP_NOK;
|
||||
|
||||
p->work.proc = proc;
|
||||
p->work.param = param;
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET cluster_worker_init(ClusterWorker *p, MppCluster *cluster)
|
||||
{
|
||||
MppThread *thd = NULL;
|
||||
MPP_RET ret = MPP_NOK;
|
||||
|
||||
INIT_LIST_HEAD(&p->list_task);
|
||||
p->worker_id = cluster->worker_id++;
|
||||
|
||||
p->batch_count = 1;
|
||||
p->work_count = 0;
|
||||
p->cluster = cluster;
|
||||
p->state = WORKER_IDLE;
|
||||
snprintf(p->name, sizeof(p->name) - 1, "%d:W%d", cluster->pid, p->worker_id);
|
||||
thd = new MppThread(cluster->worker_func, p, p->name);
|
||||
if (thd) {
|
||||
p->thd = thd;
|
||||
thd->start();
|
||||
ret = MPP_OK;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
MPP_RET cluster_worker_deinit(ClusterWorker *p)
|
||||
{
|
||||
if (p->thd) {
|
||||
p->thd->stop();
|
||||
delete p->thd;
|
||||
p->thd = NULL;
|
||||
}
|
||||
|
||||
mpp_assert(list_empty(&p->list_task));
|
||||
mpp_assert(p->work_count == 0);
|
||||
|
||||
p->batch_count = 0;
|
||||
p->cluster = NULL;
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
RK_S32 cluster_worker_get_task(ClusterWorker *p)
|
||||
{
|
||||
MppCluster *cluster = p->cluster;
|
||||
RK_S32 batch_count = p->batch_count;
|
||||
RK_S32 count = 0;
|
||||
RK_U32 new_st;
|
||||
RK_U32 old_st;
|
||||
bool ret;
|
||||
RK_S32 i;
|
||||
|
||||
cluster_dbg_flow("%s get %d task start\n", p->name, batch_count);
|
||||
|
||||
for (i = 0; i < MAX_PRIORITY; i++) {
|
||||
ClusterQueue *queue = &cluster->queue[i];
|
||||
MppNodeTask *task = NULL;
|
||||
MppNodeImpl *node = NULL;
|
||||
|
||||
do {
|
||||
cluster_queue_lock(queue);
|
||||
|
||||
if (list_empty(&queue->list)) {
|
||||
mpp_assert(queue->count == 0);
|
||||
cluster_dbg_flow("%s get P%d task ret no task\n", p->name, i);
|
||||
cluster_queue_unlock(queue);
|
||||
break;
|
||||
}
|
||||
|
||||
mpp_assert(queue->count);
|
||||
task = list_first_entry(&queue->list, MppNodeTask, list_sched);
|
||||
list_del_init(&task->list_sched);
|
||||
node = task->node;
|
||||
|
||||
queue->count--;
|
||||
|
||||
do {
|
||||
old_st = node->state;
|
||||
new_st = old_st ^ (NODE_WAIT | NODE_RUN);
|
||||
|
||||
mpp_assert(old_st & NODE_WAIT);
|
||||
ret = __sync_bool_compare_and_swap(&node->state, old_st, new_st);
|
||||
} while (!ret);
|
||||
|
||||
list_add_tail(&task->list_sched, &p->list_task);
|
||||
p->work_count++;
|
||||
count++;
|
||||
|
||||
cluster_dbg_flow("%s get P%d %s -> rq %d\n", p->name, i, node->name, p->work_count);
|
||||
|
||||
cluster_queue_unlock(queue);
|
||||
|
||||
if (count >= batch_count)
|
||||
break;
|
||||
} while (1);
|
||||
|
||||
if (count >= batch_count)
|
||||
break;
|
||||
}
|
||||
|
||||
cluster_dbg_flow("%s get %d task ret %d\n", p->name, batch_count, count);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
static void cluster_worker_run_task(ClusterWorker *p)
|
||||
{
|
||||
RK_U32 new_st;
|
||||
RK_U32 old_st;
|
||||
bool cas_ret;
|
||||
|
||||
cluster_dbg_flow("%s run %d work start\n", p->name, p->work_count);
|
||||
|
||||
while (!list_empty(&p->list_task)) {
|
||||
MppNodeTask *task = list_first_entry(&p->list_task, MppNodeTask, list_sched);
|
||||
MppNodeProc *proc = task->proc;
|
||||
MppNodeImpl *node = task->node;
|
||||
RK_S64 time_start;
|
||||
RK_S64 time_end;
|
||||
RK_U32 state;
|
||||
MPP_RET proc_ret;
|
||||
|
||||
/* check trigger for re-add task */
|
||||
cluster_dbg_flow("%s run %s start atate %d\n", p->name, task->node_name, node->state);
|
||||
mpp_assert(node->state & NODE_RUN);
|
||||
if (!(node->state & NODE_RUN))
|
||||
mpp_err_f("%s run state check %x is invalid on run", p->name, node->state);
|
||||
|
||||
time_start = mpp_time();
|
||||
proc_ret = (MPP_RET)proc->proc(proc->param);
|
||||
time_end = mpp_time();
|
||||
|
||||
cluster_dbg_flow("%s run %s ret %d\n", p->name, task->node_name, proc_ret);
|
||||
proc->run_time += time_end - time_start;
|
||||
proc->run_count++;
|
||||
|
||||
state = node->state;
|
||||
if (!(state & NODE_VALID)) {
|
||||
cluster_dbg_flow("%s run found destroy\n", p->name);
|
||||
list_del_init(&task->list_sched);
|
||||
node->attached = 0;
|
||||
|
||||
sem_post(&node->sem_detach);
|
||||
cluster_dbg_flow("%s run sem post done\n", p->name);
|
||||
} else if (state & NODE_SIGNAL) {
|
||||
ClusterQueue *queue = task->queue;
|
||||
|
||||
list_del_init(&task->list_sched);
|
||||
|
||||
do {
|
||||
old_st = state;
|
||||
// NOTE: clear NODE_RUN and NODE_SIGNAL, set NODE_WAIT
|
||||
new_st = old_st ^ (NODE_SIGNAL | NODE_WAIT | NODE_RUN);
|
||||
cas_ret = __sync_bool_compare_and_swap(&node->state, old_st, new_st);
|
||||
} while (!cas_ret);
|
||||
|
||||
cluster_dbg_flow("%s run state %x -> %x signal -> wait\n", p->name, old_st, new_st);
|
||||
|
||||
cluster_queue_lock(queue);
|
||||
list_add_tail(&task->list_sched, &queue->list);
|
||||
queue->count++;
|
||||
cluster_queue_unlock(queue);
|
||||
} else {
|
||||
list_del_init(&task->list_sched);
|
||||
do {
|
||||
old_st = node->state;
|
||||
new_st = old_st ^ (NODE_IDLE | NODE_RUN);
|
||||
|
||||
cas_ret = __sync_bool_compare_and_swap(&node->state, old_st, new_st);
|
||||
} while (!cas_ret);
|
||||
mpp_assert(node->state & NODE_IDLE);
|
||||
mpp_assert(!(node->state & NODE_RUN));
|
||||
|
||||
cluster_dbg_flow("%s run state %x -> %x run -> idle\n", p->name, old_st, new_st);
|
||||
}
|
||||
|
||||
p->work_count--;
|
||||
}
|
||||
|
||||
mpp_assert(p->work_count == 0);
|
||||
|
||||
cluster_dbg_flow("%s run all done\n", p->name);
|
||||
}
|
||||
|
||||
static void *cluster_worker(void *data)
|
||||
{
|
||||
ClusterWorker *p = (ClusterWorker *)data;
|
||||
MppThread *thd = p->thd;
|
||||
|
||||
while (1) {
|
||||
{
|
||||
RK_S32 task_count = 0;
|
||||
|
||||
cluster_dbg_lock("%s lock start\n", p->name);
|
||||
AutoMutex autolock(thd->mutex());
|
||||
cluster_dbg_lock("%s lock done\n", p->name);
|
||||
|
||||
if (MPP_THREAD_RUNNING != thd->get_status())
|
||||
break;
|
||||
|
||||
task_count = cluster_worker_get_task(p);
|
||||
if (!task_count) {
|
||||
p->state = WORKER_IDLE;
|
||||
thd->wait();
|
||||
p->state = WORKER_RUNNING;
|
||||
}
|
||||
}
|
||||
|
||||
cluster_worker_run_task(p);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void cluster_signal_f(const char *caller, MppCluster *p)
|
||||
{
|
||||
RK_S32 i;
|
||||
|
||||
cluster_dbg_flow("%s signal from %s\n", p->name, caller);
|
||||
|
||||
for (i = 0; i < p->worker_count; i++) {
|
||||
ClusterWorker *worker = &p->worker[i];
|
||||
MppThread *thd = worker->thd;
|
||||
AutoMutex auto_lock(thd->mutex());
|
||||
|
||||
if (worker->state == WORKER_IDLE) {
|
||||
thd->signal();
|
||||
cluster_dbg_flow("%s signal\n", p->name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MppClusterServer;
|
||||
|
||||
MppClusterServer *cluster_server = NULL;
|
||||
|
||||
class MppClusterServer : Mutex
|
||||
{
|
||||
private:
|
||||
// avoid any unwanted function
|
||||
MppClusterServer();
|
||||
~MppClusterServer();
|
||||
MppClusterServer(const MppClusterServer &);
|
||||
MppClusterServer &operator=(const MppClusterServer &);
|
||||
|
||||
MppCluster *mClusters[VPU_CLIENT_BUTT];
|
||||
|
||||
public:
|
||||
static MppClusterServer *single() {
|
||||
static MppClusterServer inst;
|
||||
cluster_server = &inst;
|
||||
return &inst;
|
||||
}
|
||||
|
||||
MppCluster *get(MppClientType client_type);
|
||||
MPP_RET put(MppClientType client_type);
|
||||
};
|
||||
|
||||
MppClusterServer::MppClusterServer()
|
||||
{
|
||||
memset(mClusters, 0, sizeof(mClusters));
|
||||
|
||||
mpp_env_get_u32("mpp_cluster_debug", &mpp_cluster_debug, 0);
|
||||
mpp_env_get_u32("mpp_cluster_thd_cnt", &mpp_cluster_thd_cnt, 1);
|
||||
}
|
||||
|
||||
MppClusterServer::~MppClusterServer()
|
||||
{
|
||||
RK_S32 i;
|
||||
|
||||
for (i = 0; i < VPU_CLIENT_BUTT; i++)
|
||||
put((MppClientType)i);
|
||||
}
|
||||
|
||||
MppCluster *MppClusterServer::get(MppClientType client_type)
|
||||
{
|
||||
RK_S32 i;
|
||||
MppCluster *p = NULL;
|
||||
|
||||
if (client_type >= VPU_CLIENT_BUTT)
|
||||
goto done;
|
||||
|
||||
{
|
||||
AutoMutex auto_lock(this);
|
||||
|
||||
p = mClusters[client_type];
|
||||
if (p)
|
||||
goto done;
|
||||
|
||||
p = mpp_malloc(MppCluster, 1);
|
||||
if (p) {
|
||||
for (i = 0; i < MAX_PRIORITY; i++)
|
||||
mpp_cluster_queue_init(&p->queue[i], p);
|
||||
|
||||
p->pid = getpid();
|
||||
p->client_type = client_type;
|
||||
snprintf(p->name, sizeof(p->name) - 1, "%d:%d", p->pid, client_type);
|
||||
p->node_id = 0;
|
||||
p->worker_id = 0;
|
||||
p->worker_func = cluster_worker;
|
||||
p->worker_count = mpp_cluster_thd_cnt;
|
||||
|
||||
mpp_assert(p->worker_count > 0);
|
||||
|
||||
p->worker = mpp_malloc(ClusterWorker, p->worker_count);
|
||||
|
||||
for (i = 0; i < p->worker_count; i++)
|
||||
cluster_worker_init(&p->worker[i], p);
|
||||
|
||||
mClusters[client_type] = p;
|
||||
cluster_dbg_flow("%s created\n", p->name);
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
if (p)
|
||||
cluster_dbg_flow("%s get\n", p->name);
|
||||
else
|
||||
cluster_dbg_flow("%d get cluster %d failed\n", getpid(), client_type);
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
MPP_RET MppClusterServer::put(MppClientType client_type)
|
||||
{
|
||||
RK_S32 i;
|
||||
|
||||
if (client_type >= VPU_CLIENT_BUTT)
|
||||
return MPP_NOK;
|
||||
|
||||
AutoMutex auto_lock(this);
|
||||
MppCluster *p = mClusters[client_type];
|
||||
|
||||
if (!p)
|
||||
return MPP_NOK;
|
||||
|
||||
for (i = 0; i < p->worker_count; i++)
|
||||
cluster_worker_deinit(&p->worker[i]);
|
||||
|
||||
cluster_dbg_flow("put %s\n", p->name);
|
||||
|
||||
mpp_free(p);
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_attach(MppNode node, MppClientType type)
|
||||
{
|
||||
MppNodeImpl *impl = (MppNodeImpl *)node;
|
||||
MppCluster *p = MppClusterServer::single()->get(type);
|
||||
RK_U32 priority = impl->priority;
|
||||
ClusterQueue *queue = &p->queue[priority];
|
||||
|
||||
mpp_assert(priority < MAX_PRIORITY);
|
||||
mpp_assert(p);
|
||||
|
||||
impl->node_id = __sync_fetch_and_add(&p->node_id, 1);
|
||||
|
||||
snprintf(impl->name, sizeof(impl->name) - 1, "%s:%d", p->name, impl->node_id);
|
||||
|
||||
mpp_node_task_attach(&impl->task, impl, queue, &impl->work);
|
||||
|
||||
__sync_fetch_and_add(&p->node_count, 1);
|
||||
|
||||
cluster_dbg_flow("%s:%d attached %d\n", p->name, impl->node_id, p->node_count);
|
||||
|
||||
/* attach and run once first */
|
||||
mpp_node_task_schedule(&impl->task);
|
||||
cluster_dbg_flow("%s trigger signal from %s\n", impl->name, __FUNCTION__);
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_detach(MppNode node)
|
||||
{
|
||||
MppNodeImpl *impl = (MppNodeImpl *)node;
|
||||
|
||||
mpp_node_task_detach(&impl->task);
|
||||
cluster_dbg_flow("%s detached\n", impl->name);
|
||||
|
||||
return MPP_OK;
|
||||
}
|
||||
|
||||
MPP_RET mpp_node_trigger_f(const char *caller, MppNode node, RK_S32 trigger)
|
||||
{
|
||||
if (trigger) {
|
||||
MppNodeImpl *impl = (MppNodeImpl *)node;
|
||||
|
||||
mpp_node_task_schedule_from(caller, &impl->task);
|
||||
}
|
||||
|
||||
return MPP_OK;
|
||||
}
|
@@ -34,6 +34,9 @@ add_mpp_base_test(mpp_bit)
|
||||
# mpp_trie unit test
|
||||
add_mpp_base_test(mpp_trie)
|
||||
|
||||
# mpp_cluster unit test
|
||||
add_mpp_base_test(mpp_cluster)
|
||||
|
||||
# mpp_enc_cfg unit test
|
||||
add_mpp_base_test(mpp_enc_cfg)
|
||||
|
||||
|
104
mpp/base/test/mpp_cluster_test.c
Normal file
104
mpp/base/test/mpp_cluster_test.c
Normal file
@@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Copyright 2021 Rockchip Electronics Co. LTD
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#define MODULE_TAG "mpp_cluster_test"
|
||||
|
||||
#include "mpp_log.h"
|
||||
#include "mpp_mem.h"
|
||||
#include "mpp_time.h"
|
||||
#include "mpp_common.h"
|
||||
|
||||
#include "mpp_cluster.h"
|
||||
|
||||
typedef struct MppTestNode_t {
|
||||
MppNode node;
|
||||
} MppTestNode;
|
||||
|
||||
MppTestNode test_node;
|
||||
|
||||
static RK_S32 mpp_cluster_test_worker(void *param)
|
||||
{
|
||||
RK_S32 ret = MPP_NOK;
|
||||
(void) param;
|
||||
|
||||
mpp_log_f("worker run start\n");
|
||||
mpp_log_f("worker run ret %d\n", ret);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
MPP_RET ret = MPP_OK;
|
||||
MppNode node = test_node.node;
|
||||
RK_U32 total_run = 2;
|
||||
|
||||
mpp_log("mpp_cluster_test start\n");
|
||||
|
||||
ret = mpp_node_init(&node);
|
||||
if (ret) {
|
||||
mpp_err("mpp_node_init failed ret %d\n", ret);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
mpp_log("mpp_cluster_test init node done\n");
|
||||
|
||||
/* setup node info */
|
||||
mpp_node_set_func(node, mpp_cluster_test_worker, &test_node);
|
||||
|
||||
mpp_log("mpp_cluster_test attach node start\n");
|
||||
ret = mpp_node_attach(node, VPU_CLIENT_RKVDEC);
|
||||
if (ret) {
|
||||
mpp_err("mpp_node_attach failed ret %d\n", ret);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
mpp_log("mpp_cluster_test trigger start\n");
|
||||
|
||||
do {
|
||||
ret = mpp_node_trigger(node, 1);
|
||||
if (ret) {
|
||||
mpp_err("mpp_node_trigger failed ret %d\n", ret);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
mpp_log("mpp_cluster_test trigger %d left\n", total_run);
|
||||
|
||||
msleep(5);
|
||||
} while (--total_run);
|
||||
|
||||
mpp_log("mpp_cluster_test detach start\n");
|
||||
|
||||
ret = mpp_node_detach(node);
|
||||
if (ret) {
|
||||
mpp_err("mpp_node_detach failed ret %d\n", ret);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
mpp_log("mpp_cluster_test deinit start\n");
|
||||
|
||||
ret = mpp_node_deinit(node);
|
||||
if (ret) {
|
||||
mpp_err("mpp_node_deinit failed ret %d\n", ret);
|
||||
goto DONE;
|
||||
}
|
||||
|
||||
mpp_log("mpp_cluster_test deinit done\n");
|
||||
|
||||
DONE:
|
||||
mpp_log("mpp_cluster_test done %s\n", ret ? "failed" : "success");
|
||||
return ret;
|
||||
}
|
Reference in New Issue
Block a user