diff --git a/inc/mpp_buffer.h b/inc/mpp_buffer.h index b0bf217a..765b842b 100644 --- a/inc/mpp_buffer.h +++ b/inc/mpp_buffer.h @@ -20,6 +20,18 @@ #include "rk_type.h" #include "mpp_err.h" +/* + * because buffer usage maybe unknown when decoder is not started + * buffer group may need to set a default group size limit + */ +#define SZ_1K (1024) +#define SZ_1M (SZ_1K*SZ_1K) +#define SZ_2M (SZ_1M*2) +#define SZ_4M (SZ_1M*4) +#define SZ_8M (SZ_1M*8) +#define SZ_64M (SZ_1M*64) +#define SZ_80M (SZ_1M*80) + typedef void* MppBuffer; typedef void* MppBufferGroup; @@ -76,18 +88,22 @@ typedef enum { MPP_BUFFER_TYPE_BUTT, } MppBufferType; +typedef union MppBufferData_t MppBufferData; +union MppBufferData_t { + void *ptr; + RK_S32 fd; +}; + typedef struct { MppBufferType type; size_t size; - union { - void *ptr; - RK_S32 fd; - } data; + MppBufferData data; } MppBufferCommit; -#define mpp_buffer_commit(...) _mpp_buffer_commit(MODULE_TAG, ## __VA_ARGS__) -#define mpp_buffer_get(...) _mpp_buffer_get(MODULE_TAG, ## __VA_ARGS__) -#define mpp_buffer_group_get(...) _mpp_buffer_group_get(MODULE_TAG, ## __VA_ARGS__) +#define BUFFER_GROUP_SIZE_DEFAULT (SZ_1M*80) + +#define mpp_buffer_get(...) mpp_buffer_get_with_tag(MODULE_TAG, ## __VA_ARGS__) +#define mpp_buffer_group_get(...) mpp_buffer_group_get_with_tag(MODULE_TAG, ## __VA_ARGS__) #ifdef __cplusplus extern "C" { @@ -98,13 +114,14 @@ extern "C" { * these interface will change value of group and buffer so before calling functions * parameter need to be checked. */ -MPP_RET _mpp_buffer_commit(const char *tag, MppBufferGroup group, MppBufferCommit *buffer); -MPP_RET _mpp_buffer_get(const char *tag, MppBufferGroup group, MppBuffer *buffer, size_t size); +MPP_RET mpp_buffer_commit(MppBufferGroup group, MppBufferCommit *info); +MPP_RET mpp_buffer_get_with_tag(const char *tag, MppBufferGroup group, MppBuffer *buffer, size_t size); MPP_RET mpp_buffer_put(MppBuffer *buffer); MPP_RET mpp_buffer_inc_ref(MppBuffer buffer); -MPP_RET _mpp_buffer_group_get(const char *tag, MppBufferGroup *group, MppBufferType type); +MPP_RET mpp_buffer_group_get_with_tag(const char *tag, MppBufferGroup *group, MppBufferType type); MPP_RET mpp_buffer_group_put(MppBufferGroup *group); +MPP_RET mpp_buffer_group_set_limit(MppBufferGroup group, size_t limit); #ifdef __cplusplus } diff --git a/inc/mpp_err.h b/inc/mpp_err.h index 7c80d421..26c30d45 100644 --- a/inc/mpp_err.h +++ b/inc/mpp_err.h @@ -24,9 +24,10 @@ typedef enum { MPP_SUCCESS = RK_SUCCESS, MPP_OK = RK_OK, - MPP_ERR_UNKNOW = -1, - MPP_ERR_NULL_PTR = -2, - MPP_ERR_MALLOC = -3, + MPP_NOK = -1, + MPP_ERR_UNKNOW = -2, + MPP_ERR_NULL_PTR = -3, + MPP_ERR_MALLOC = -4, MPP_ERR_BASE = -1000, diff --git a/mpp/mpp_buffer.cpp b/mpp/mpp_buffer.cpp index 8fef0356..11f90fbe 100644 --- a/mpp/mpp_buffer.cpp +++ b/mpp/mpp_buffer.cpp @@ -22,28 +22,67 @@ #include "mpp_mem.h" #include "mpp_buffer_impl.h" -MPP_RET _mpp_buffer_commit(const char *tag, MppBufferGroup group, MppBufferCommit *buffer) +MPP_RET mpp_buffer_commit(MppBufferGroup group, MppBufferCommit *info) { - return MPP_OK; + if (NULL == group || NULL == info) { + mpp_err("mpp_buffer_commit input null pointer group %p info %p\n", + group, info); + return MPP_ERR_NULL_PTR; + } + + MppBufferGroupImpl *p = (MppBufferGroupImpl *)group; + if (p->type != info->type || p->type >= MPP_BUFFER_TYPE_BUTT) { + mpp_err("mpp_buffer_commit invalid type found group %d info %d\n", + p->type, info->type); + return MPP_ERR_UNKNOW; + } + + MppBufferImpl *buffer; + + return mpp_buffer_init(&buffer, NULL, p->group_id, info->size, &info->data); } -MPP_RET _mpp_buffer_get(const char *tag, MppBufferGroup group, MppBuffer *buffer, size_t size) +MPP_RET mpp_buffer_get_with_tag(const char *tag, MppBufferGroup group, MppBuffer *buffer, size_t size) { - return MPP_OK; + if (NULL == group || NULL == buffer || 0 == size) { + mpp_err("mpp_buffer_get invalid input: group %p buffer %p size %u\n", + group, buffer, size); + return MPP_ERR_UNKNOW; + } + + MppBufferGroupImpl *tmp = (MppBufferGroupImpl *)group; + // try unused buffer first + MppBufferImpl *buf = mpp_buffer_get_unused(tmp, size); + if (NULL == buf) { + // if failed try init a new buffer + mpp_buffer_init(&buf, tag, tmp->group_id, size, NULL); + } + *buffer = buf; + return (buf) ? (MPP_OK) : (MPP_NOK); } MPP_RET mpp_buffer_put(MppBuffer *buffer) { - return MPP_OK; + if (NULL == buffer) { + mpp_err("mpp_buffer_put invalid input: buffer %p\n", buffer); + return MPP_ERR_UNKNOW; + } + + return mpp_buffer_ref_dec((MppBufferImpl*)*buffer); } MPP_RET mpp_buffer_inc_ref(MppBuffer buffer) { - return MPP_OK; + if (NULL == buffer) { + mpp_err("mpp_buffer_put invalid input: buffer %p\n", buffer); + return MPP_ERR_UNKNOW; + } + + return mpp_buffer_ref_inc((MppBufferImpl*)buffer); } -MPP_RET _mpp_buffer_group_get(const char *tag, MppBufferGroup *group, MppBufferType type) +MPP_RET mpp_buffer_group_get_with_tag(const char *tag, MppBufferGroup *group, MppBufferType type) { if (NULL == group || type >= MPP_BUFFER_TYPE_BUTT) { mpp_err("mpp_buffer_group_get input invalid group %p type %d\n", @@ -51,12 +90,25 @@ MPP_RET _mpp_buffer_group_get(const char *tag, MppBufferGroup *group, MppBufferT return MPP_ERR_UNKNOW; } - MppBufferGroupImpl *p; - return mpp_buffer_group_create(&p, tag, type); + return mpp_buffer_group_init((MppBufferGroupImpl**)group, tag, type); } MPP_RET mpp_buffer_group_put(MppBufferGroup *group) { - return MPP_OK; + if (NULL == group) { + mpp_err("mpp_buffer_group_put input invalid group %p\n", group); + return MPP_NOK; + } + + MppBufferGroupImpl *p = (MppBufferGroupImpl *)*group; + *group = NULL; + + return mpp_buffer_group_deinit(p); +} + +MPP_RET mpp_buffer_group_set_limit(MppBufferGroup group, size_t limit) +{ + MppBufferGroupImpl *p = (MppBufferGroupImpl *)group; + return mpp_buffer_group_limit_reset(p, limit); } diff --git a/mpp/mpp_buffer_impl.cpp b/mpp/mpp_buffer_impl.cpp index 26e08fb0..02337f01 100644 --- a/mpp/mpp_buffer_impl.cpp +++ b/mpp/mpp_buffer_impl.cpp @@ -45,30 +45,177 @@ static MppBufferService services = LIST_HEAD_INIT(services.list_orphan), }; -static RK_U32 add_group_to_service(MppBufferGroupImpl *p) +static MppBufferGroupImpl *search_buffer_group_by_id_no_lock(RK_U32 group_id) { - list_add_tail(&p->list_group, &services.list_group); - services.group_count++; - return services.group_id++; + MppBufferGroupImpl *pos, *n; + list_for_each_entry_safe(pos, n, &services.list_group, MppBufferGroupImpl, list_group) { + if (pos->group_id == group_id) { + return pos; + } + } + return NULL; } -static void del_group_from_service(MppBufferGroupImpl *p) +MPP_RET deinit_buffer_no_lock(MppBufferImpl *buffer) { - list_del_init(&p->list_group); - services.group_count--; + mpp_assert(buffer->ref_count == 0); + mpp_assert(buffer->used == 0); + + list_del_init(&buffer->list_status); + MppBufferGroupImpl *group = search_buffer_group_by_id_no_lock(buffer->group_id); + if (group) + group->usage -= buffer->size; + + mpp_free(buffer); + + return MPP_OK; } -static void add_orphan_to_service(MppBufferImpl *p) +static MPP_RET check_buffer_group_limit(MppBufferGroupImpl *group) { - list_add_tail(&p->list_status, &services.list_orphan); + if (group->usage > group->limit) { + MppBufferImpl *pos, *n; + list_for_each_entry_safe(pos, n, &group->list_unused, MppBufferImpl, list_status) { + deinit_buffer_no_lock(pos); + } + } + + return (group->usage > group->limit) ? (MPP_NOK) : (MPP_OK); } -static void del_orphan_from_service(MppBufferImpl *p) +static MPP_RET inc_buffer_ref_no_lock(MppBufferImpl *buffer) { - list_del_init(&p->list_status); + MPP_RET ret = MPP_OK; + if (!buffer->used) { + MppBufferGroupImpl *group = search_buffer_group_by_id_no_lock(buffer->group_id); + // NOTE: when increasing ref_count the unused buffer must be under certain group + mpp_assert(group); + buffer->used = 1; + if (group) { + list_del_init(&buffer->list_status); + list_add_tail(&buffer->list_status, &group->list_used); + } else { + mpp_err("mpp_buffer_ref_inc unused buffer without group\n"); + ret = MPP_NOK; + } + } + buffer->ref_count++; + return ret; } -MPP_RET mpp_buffer_group_create(MppBufferGroupImpl **group, const char *tag, MppBufferType type) +MPP_RET mpp_buffer_init(MppBufferImpl **buffer, const char *tag, RK_U32 group_id, size_t size, MppBufferData *data) +{ + MppBufferImpl *p = mpp_malloc(MppBufferImpl, 1); + if (NULL == p) { + mpp_err("mpp_buffer_init failed to allocate context\n"); + *buffer = NULL; + return MPP_ERR_MALLOC; + } + + MPP_BUFFER_SERVICE_LOCK(); + + MppBufferGroupImpl *group = search_buffer_group_by_id_no_lock(group_id); + if (group) { + if (NULL == tag) { + tag = group->tag; + } + strncpy(p->tag, tag, sizeof(p->tag)); + p->group_id = group_id; + p->size = size; + if (data) { + p->data = *data; + } else { + // TODO: if data is NULL need to allocate from allocator + } + p->used = 0; + p->ref_count = 0; + INIT_LIST_HEAD(&p->list_status); + list_add_tail(&p->list_status, &group->list_unused); + group->usage += size; + } else { + mpp_free(p); + p = NULL; + } + + MPP_BUFFER_SERVICE_UNLOCK(); + + *buffer = p; + return (p) ? (MPP_OK) : (MPP_NOK); +} + +MPP_RET mpp_buffer_deinit(MppBufferImpl *buffer) +{ + MPP_BUFFER_SERVICE_LOCK(); + + deinit_buffer_no_lock(buffer); + + MPP_BUFFER_SERVICE_UNLOCK(); + + return MPP_OK; +} + +MPP_RET mpp_buffer_ref_inc(MppBufferImpl *buffer) +{ + MPP_RET ret; + + MPP_BUFFER_SERVICE_LOCK(); + + ret = inc_buffer_ref_no_lock(buffer); + + MPP_BUFFER_SERVICE_UNLOCK(); + + return ret; +} + + +MPP_RET mpp_buffer_ref_dec(MppBufferImpl *buffer) +{ + if (buffer->ref_count <= 0) { + mpp_err("mpp_buffer_ref_dec found non-positive ref_count %d\n", buffer->ref_count); + return MPP_NOK; + } + + MPP_BUFFER_SERVICE_LOCK(); + + buffer->ref_count--; + if (0 == buffer->ref_count) { + buffer->used = 0; + list_del_init(&buffer->list_status); + MppBufferGroupImpl *group = search_buffer_group_by_id_no_lock(buffer->group_id); + if (group) { + list_add_tail(&buffer->list_status, &group->list_unused); + check_buffer_group_limit(group); + } + } + + MPP_BUFFER_SERVICE_UNLOCK(); + + return MPP_OK; +} + +MppBufferImpl *mpp_buffer_get_unused(MppBufferGroupImpl *p, size_t size) +{ + MppBufferImpl *buffer = NULL; + + MPP_BUFFER_SERVICE_LOCK(); + + if (!list_empty(&p->list_unused)) { + MppBufferImpl *pos, *n; + list_for_each_entry_safe(pos, n, &p->list_unused, MppBufferImpl, list_status) { + if (pos->size == size) { + buffer = pos; + inc_buffer_ref_no_lock(buffer); + break; + } + } + } + + MPP_BUFFER_SERVICE_UNLOCK(); + + return buffer; +} + +MPP_RET mpp_buffer_group_init(MppBufferGroupImpl **group, const char *tag, MppBufferType type) { MppBufferGroupImpl *p = mpp_malloc(MppBufferGroupImpl, 1); if (NULL == p) { @@ -83,11 +230,20 @@ MPP_RET mpp_buffer_group_create(MppBufferGroupImpl **group, const char *tag, Mpp INIT_LIST_HEAD(&p->list_used); INIT_LIST_HEAD(&p->list_unused); - RK_U32 group_id = add_group_to_service(p); + list_add_tail(&p->list_group, &services.list_group); - snprintf(p->tag, MPP_TAG_SIZE, "%s_%d", tag, group_id); + snprintf(p->tag, sizeof(p->tag), "%s_%d", tag, services.group_id); p->type = type; - p->group_id = group_id; + p->limit = BUFFER_GROUP_SIZE_DEFAULT; + p->usage = 0; + p->group_id = services.group_id; + + // avoid group_id reuse + do { + services.group_count++; + if (NULL == search_buffer_group_by_id_no_lock(services.group_count)) + break; + } while (p->group_id != services.group_count); MPP_BUFFER_SERVICE_UNLOCK(); *group = p; @@ -95,34 +251,48 @@ MPP_RET mpp_buffer_group_create(MppBufferGroupImpl **group, const char *tag, Mpp return MPP_OK; } -MPP_RET mpp_buffer_group_destroy(MppBufferGroupImpl *p) +MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p) { if (NULL == p) { - mpp_err("mpp_buffer_group_destroy found NULL pointer\n"); + mpp_err("mpp_buffer_group_deinit found NULL pointer\n"); return MPP_ERR_NULL_PTR; } MPP_BUFFER_SERVICE_LOCK(); - del_group_from_service(p); - // remove unused list if (!list_empty(&p->list_unused)) { MppBufferImpl *pos, *n; list_for_each_entry_safe(pos, n, &p->list_unused, MppBufferImpl, list_status) { - mpp_free(pos); + deinit_buffer_no_lock(pos); } } + // move used buffer to orphan list if (!list_empty(&p->list_used)) { MppBufferImpl *pos, *n; list_for_each_entry_safe(pos, n, &p->list_used, MppBufferImpl, list_status) { - add_orphan_to_service(pos); + list_add_tail(&pos->list_status, &services.list_orphan); } } + list_del_init(&p->list_group); + services.group_count--; + MPP_BUFFER_SERVICE_UNLOCK(); return MPP_OK; } +MPP_RET mpp_buffer_group_limit_reset(MppBufferGroupImpl *p, size_t limit) +{ + mpp_log("mpp_buffer_group %p limit reset from %u to %u\n", p->limit, limit); + + MPP_BUFFER_SERVICE_LOCK(); + p->limit = limit; + MPP_RET ret = check_buffer_group_limit(p); + MPP_BUFFER_SERVICE_UNLOCK(); + + return ret; +} + diff --git a/mpp/mpp_buffer_impl.h b/mpp/mpp_buffer_impl.h index be1b4b60..38574c95 100644 --- a/mpp/mpp_buffer_impl.h +++ b/mpp/mpp_buffer_impl.h @@ -30,25 +30,19 @@ #define MPP_BUF_FUNCTION_LEAVE_OK() mpp_buf_dbg(MPP_BUF_DBG_FUNCTION, "%s success\n", __FUNCTION__) #define MPP_BUF_FUNCTION_LEAVE_FAIL() mpp_buf_dbg(MPP_BUF_DBG_FUNCTION, "%s failed\n", __FUNCTION__) -typedef union MppBufferData_t MppBufferData; typedef struct MppBufferImpl_t MppBufferImpl; typedef struct MppBufferGroupImpl_t MppBufferGroupImpl; -union MppBufferData_t { - void *ptr; - RK_S32 fd; -}; - +// use index instead of pointer to avoid invalid pointer struct MppBufferImpl_t { char tag[MPP_TAG_SIZE]; - // use index instead of pointer to avoid invalid pointer RK_U32 group_id; - MppBufferType type; - size_t size; MppBufferData data; - RK_S32 ref_count; + // used flag is for used/unused list detection + RK_U32 used; + RK_S32 ref_count; struct list_head list_status; }; @@ -56,6 +50,8 @@ struct MppBufferGroupImpl_t { char tag[MPP_TAG_SIZE]; RK_U32 group_id; MppBufferType type; + size_t limit; + size_t usage; // link to the other MppBufferGroupImpl struct list_head list_group; @@ -71,8 +67,17 @@ extern "C" { extern RK_U32 mpp_buffer_debug; -MPP_RET mpp_buffer_group_create(MppBufferGroupImpl **group, const char *tag, MppBufferType type); -MPP_RET mpp_buffer_group_destroy(MppBufferGroupImpl *p); +MPP_RET mpp_buffer_init(MppBufferImpl **buffer, const char *tag, RK_U32 group_id, size_t size, MppBufferData *data); +MPP_RET mpp_buffer_deinit(MppBufferImpl *buffer); +MPP_RET mpp_buffer_ref_inc(MppBufferImpl *buffer); +MPP_RET mpp_buffer_ref_dec(MppBufferImpl *buffer); +// TODO: get unused is a block or wait function + +MppBufferImpl *mpp_buffer_get_unused(MppBufferGroupImpl *p, size_t size); + +MPP_RET mpp_buffer_group_init(MppBufferGroupImpl **group, const char *tag, MppBufferType type); +MPP_RET mpp_buffer_group_deinit(MppBufferGroupImpl *p); +MPP_RET mpp_buffer_group_limit_reset(MppBufferGroupImpl *p, size_t limit); #ifdef __cplusplus } diff --git a/test/mpp_buffer_test.c b/test/mpp_buffer_test.c index eb62d77c..abd44ee4 100644 --- a/test/mpp_buffer_test.c +++ b/test/mpp_buffer_test.c @@ -88,7 +88,7 @@ int main() } if (group) - mpp_buffer_group_put(group); + mpp_buffer_group_put(&group); mpp_log("mpp_buffer_test success\n"); return ret;