Files
2022-10-31 10:33:29 +08:00

487 lines
17 KiB
Python

# Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from xmlrpc.client import boolean
import os
import math
from StreamManagerApi import StreamManagerApi, MxDataInput, StringVector, MxProtobufIn, InProtobufVector
import MxpiDataType_pb2 as MxpiDataType
import numpy as np
import cv2
from PIL import Image
color2 = [(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255),
(0, 0, 255), (0, 255, 0), (255, 0, 0), (0, 0, 139), (0, 69, 255), ]
def get_img_metas(file_name):
img_temp = Image.open(file_name)
img_size = img_temp.size
org_width, org_height = img_size
resize_ratio = 1280 / org_width
if resize_ratio > 768 / org_height:
resize_ratio = 768 / org_height
img_metas = np.array([img_size[1], img_size[0]] +
[resize_ratio, resize_ratio])
return img_metas
def bbox2result_1image(bboxes, labels, num_classes):
"""Convert detection results to a list of numpy arrays.
Args:
bboxes (Tensor): shape (n, 5)
labels (Tensor): shape (n, )
num_classes (int): class number, including background class
Returns:
list(ndarray): bbox results of each class
"""
if bboxes.shape[0] == 0:
result = [np.zeros((0, 5), dtype=np.float32)
for i in range(num_classes - 1)]
else:
result = [bboxes[labels == i, :] for i in range(num_classes - 1)]
result_person = bboxes[labels == 0, :]
return result, result_person
def box_to_center_scale(box, model_image_width, model_image_height):
pixel_std = 200
center_temp = np.zeros((2), dtype=np.float32)
bottom_left_point = box[0]
top_right_point = box[1]
bbox_w = top_right_point[0] - bottom_left_point[0]
bbox_h = top_right_point[1] - bottom_left_point[1]
bottom_left_x = bottom_left_point[0]
bottom_left_y = bottom_left_point[1]
center_temp[0] = bottom_left_x + bbox_w * 0.5
center_temp[1] = bottom_left_y + bbox_h * 0.5
ratio_w2h = model_image_width * 1.0 / model_image_height
if bbox_w > ratio_w2h * bbox_h:
bbox_h = bbox_w * 1.0 / ratio_w2h
elif bbox_w < ratio_w2h * bbox_h:
bbox_w = bbox_h * ratio_w2h
scale_temp = np.array(
[bbox_w * 1.0 / pixel_std, bbox_h * 1.0 / pixel_std],
dtype=np.float32)
if center_temp[0] != -1:
scale_temp = scale_temp * 1.25
return center_temp, scale_temp
def get_dir(src_point, rot_rad):
sn, cs = np.sin(rot_rad), np.cos(rot_rad)
_src_result = [0, 0]
_src_result[0] = src_point[0] * cs - src_point[1] * sn
_src_result[1] = src_point[0] * sn + src_point[1] * cs
return _src_result
def transform_preds(coords, c, s, output_size):
target_coords = np.zeros(coords.shape)
trans_temp = get_affine_transform(c, s, 0, output_size, inv=1)
for p in range(coords.shape[0]):
target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans_temp)
return target_coords
def affine_transform(pt, t):
new_pt = np.array([pt[0], pt[1], 1.]).T
new_pt = np.dot(t, new_pt)
return new_pt[:2]
def get_3rd_point(a, b):
direct = a - b
return b + np.array([-direct[1], direct[0]], dtype=np.float32)
def get_affine_transform(
center_temp, _scale, rot, output_size,
shift=np.array([0, 0], dtype=np.float32), inv=0
):
if not isinstance(_scale, np.ndarray) and not isinstance(_scale, list):
_scale = np.array([_scale, _scale])
scale_tmp = _scale * 200.0
src_w = scale_tmp[0]
dst_w = output_size[0]
dst_h = output_size[1]
rot_rad = np.pi * rot / 180
src_dir = get_dir([0, src_w * -0.5], rot_rad)
dst_dir = np.array([0, dst_w * -0.5], np.float32)
src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center_temp + scale_tmp * shift
src[1, :] = center_temp + src_dir + scale_tmp * shift
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])
if inv:
_trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
_trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return _trans
def get_max_preds(batch_heatmaps):
'''
get predictions from score maps
heatmaps: numpy.ndarray([batch_size, num_joints, height, width])
'''
assert isinstance(batch_heatmaps, np.ndarray), \
'batch_heatmaps should be numpy.ndarray'
assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'
batch_size = batch_heatmaps.shape[0]
num_joints = batch_heatmaps.shape[1]
width = batch_heatmaps.shape[3]
heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
idx = np.argmax(heatmaps_reshaped, 2)
_maxvals = np.amax(heatmaps_reshaped, 2)
_maxvals = _maxvals.reshape((batch_size, num_joints, 1))
idx = idx.reshape((batch_size, num_joints, 1))
_preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
_preds[:, :, 0] = (_preds[:, :, 0]) % width
_preds[:, :, 1] = np.floor((_preds[:, :, 1]) / width)
pred_mask = np.tile(np.greater(_maxvals, 0.0), (1, 1, 2))
pred_mask = pred_mask.astype(np.float32)
_preds *= pred_mask
return _preds, _maxvals
def get_final_preds(batch_heatmaps, c, s):
coords, _maxvals = get_max_preds(batch_heatmaps)
heatmap_height = batch_heatmaps.shape[2]
heatmap_width = batch_heatmaps.shape[3]
# post-processing
for n in range(coords.shape[0]):
for p in range(coords.shape[1]):
hm = batch_heatmaps[n][p]
px = int(math.floor(coords[n][p][0] + 0.5))
py = int(math.floor(coords[n][p][1] + 0.5))
if 1 < px < heatmap_width - 1 and 1 < py < heatmap_height - 1:
diff = np.array(
[
hm[py][px + 1] - hm[py][px - 1],
hm[py + 1][px] - hm[py - 1][px]
]
)
coords[n][p] += np.sign(diff) * .25
_preds = coords.copy()
# Transform back
for ii in range(coords.shape[0]):
_preds[ii] = transform_preds(
coords[ii], c[ii], s[ii], [heatmap_width, heatmap_height]
)
return _preds, _maxvals
def mask_generate(_filter, num_joint):
class_num = [27, 5, 10, 6, 6, 4, 2, 3, 3, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
idx_num = [0, 27, 32, 42, 48, 54, 58, 60, 63, 66, 67,
68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
_mask = np.zeros(
(len(_filter), num_joint, 2),
dtype=np.float32
)
for i1, index in enumerate(_filter):
for j1 in range(idx_num[index], idx_num[index + 1]):
_mask[i1][j1][0] = 1
_mask[i1][j1][1] = 1
return _mask
def draw_pose(keypoints, _img):
"""draw the keypoints and the skeletons.
:params keypoints: the shape should be equal to [17,2]
:params img:
"""
assert keypoints.shape == (80, 2)
for _i in range(80):
x_a, y_a = keypoints[_i][0], keypoints[_i][1]
cv2.circle(_img, (int(x_a), int(y_a)), 10, color2[_i], 10)
def normalize(data, _mean, _std):
# transforms.ToTensor, transforms.Normalize的numpy 实现
if not isinstance(_mean, np.ndarray):
_mean = np.array(_mean)
if not isinstance(_std, np.ndarray):
_std = np.array(_std)
if _mean.ndim == 1:
_mean = np.reshape(_mean, (-1, 1, 1))
if _std.ndim == 1:
_std = np.reshape(_std, (-1, 1, 1))
_div = np.divide(data, 255)
_div = np.transpose(_div, (2, 0, 1))
_sub = np.subtract(_div, _mean)
out_normalize = np.divide(_sub, _std)
return out_normalize
if __name__ == '__main__':
# stream manager init
streamManagerApi = StreamManagerApi()
ret = streamManagerApi.InitManager()
if ret != 0:
print("The Stream manager failed to init, ret=", str(ret))
exit()
# create streams
with open("./pipeline/fasterrcnn.pipeline", 'rb') as f1:
pipelineStr = f1.read()
ret = streamManagerApi.CreateMultipleStreams(pipelineStr)
if ret != 0:
print("The Stream cannot be created. ret=", str(ret))
exit()
# Construct the stream input
dataInput = MxDataInput()
FILE_PATH = " "
if os.path.exists(FILE_PATH) != 1:
print("The input picture cannot be found. Check the path.")
streamManagerApi.DestroyAllStreams()
exit()
else:
try:
image = Image.open(FILE_PATH)
if (image.format == 'JPEG' ) != 1:
print('The input of the model only support jpg, current format is {}.'.format(image.format))
streamManagerApi.DestroyAllStreams()
exit()
else:
with open(FILE_PATH, 'rb') as f:
dataInput.data = f.read()
except IOError:
print('There is an IOError, maybe input is not a picture. Please check it!')
streamManagerApi.DestroyAllStreams()
exit()
# Inputs data to a specified stream.
STREAM_NAME1 = b'model1'
IN_PLUGIN_ID = 0
uniqueId = streamManagerApi.SendData(
STREAM_NAME1, IN_PLUGIN_ID, dataInput)
# send image data
META = get_img_metas(FILE_PATH).astype(np.float32).tobytes()
KEY = b'appsrc1'
visionList = MxpiDataType.MxpiVisionList()
visionVec = visionList.visionVec.add()
visionVec.visionData.deviceId = 0
visionVec.visionData.memType = 0
visionVec.visionData.dataStr = META
protobuf = MxProtobufIn()
protobuf.key = KEY
protobuf.type = b'MxTools.MxpiVisionList'
protobuf.protobuf = visionList.SerializeToString()
protobufVec = InProtobufVector()
protobufVec.push_back(protobuf)
uniqueId1 = streamManagerApi.SendProtobuf(
STREAM_NAME1, b'appsrc1', protobufVec)
if uniqueId1 < 0:
print("Failed to send data to stream.")
exit()
keyVec = StringVector()
KEYS = b"mxpi_tensorinfer0"
for key in KEYS:
keyVec.push_back(KEYS)
infer_result = streamManagerApi.GetProtobuf(STREAM_NAME1, 0, keyVec)
# print the infer result
if infer_result.size() == 0:
print("infer_result is null")
exit()
if infer_result[0].errorCode != 0:
print("GetProtobuf error. errorCode=%d, errorMsg=%s" % (
infer_result[0].errorCode, infer_result[0].data.decode()))
exit()
tensorList = MxpiDataType.MxpiTensorPackageList()
tensorList.ParseFromString(infer_result[0].messageBuf)
pre_mask = np.frombuffer(
tensorList.tensorPackageVec[0].tensorVec[2].dataStr,
dtype=boolean)
pre_mask = pre_mask.reshape((1, 80000, 1))
pre_label = np.frombuffer(
tensorList.tensorPackageVec[0].tensorVec[1].dataStr,
dtype=np.uint32)
pre_label = pre_label.reshape((1, 80000, 1))
pre_bbox = np.frombuffer(
tensorList.tensorPackageVec[0].tensorVec[0].dataStr,
dtype=np.float16)
pre_bbox = pre_bbox.reshape((1, 80000, 5))
bbox_squee = np.squeeze(pre_bbox.reshape(80000, 5))
label_squee = np.squeeze(pre_label.reshape(80000, 1))
mask_squee = np.squeeze(pre_mask.reshape(80000, 1))
all_bboxes_tmp_mask = bbox_squee[mask_squee, :]
all_labels_tmp_mask = label_squee[mask_squee]
if all_bboxes_tmp_mask.shape[0] > 128:
inds = np.argsort(-all_bboxes_tmp_mask[:, -1]) # 返回降序排列索引值
inds = inds[:128]
all_bboxes_tmp_mask = all_bboxes_tmp_mask[inds]
all_labels_tmp_mask = all_labels_tmp_mask[inds]
outputs = []
outputs_tmp, out_person = bbox2result_1image(
all_bboxes_tmp_mask, all_labels_tmp_mask, 81)
outputs.append(outputs_tmp)
img = cv2.imread(FILE_PATH)
box_person = (int(out_person[0][0]), int(out_person[0][1])), (int(
out_person[0][2]), int(out_person[0][3]))
ret2 = streamManagerApi.InitManager()
if ret2 != 0:
print("Failed to init Stream manager, ret=%s" % str(ret))
exit()
# create streams by pipeline config file
with open("./pipeline/hrnet.pipeline", 'rb') as f2:
pipelineStr2 = f2.read()
ret2 = streamManagerApi.CreateMultipleStreams(pipelineStr2)
if ret2 != 0:
print("Failed to create Stream, ret=%s" % str(ret))
exit()
image_bgr = cv2.imread(FILE_PATH)
image = image_bgr[:, :, [2, 1, 0]]
center, scale = box_to_center_scale(box_person, 288, 384)
image_pose = image.copy()
# model 2 preprocess
trans = get_affine_transform(center, scale, 0, [288, 384])
model_input = cv2.warpAffine(
image_pose,
trans,
(288, 384),
flags=cv2.INTER_LINEAR)
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
std = np.array([0.229, 0.224, 0.225], dtype=np.float32)
data_test = normalize(model_input, mean, std)
data_test = data_test.astype('float32')
data_test = np.reshape(data_test, (1, 3, 384, 288))
print("model_input_gai", data_test.shape, data_test)
tensors = data_test
STREAM_NAME_2 = b'model2'
tensorPackageList = MxpiDataType.MxpiTensorPackageList()
tensorPackage = tensorPackageList.tensorPackageVec.add()
print(tensors.shape)
array_bytes = tensors.tobytes()
dataInput = MxDataInput()
dataInput.data = array_bytes
tensorVec = tensorPackage.tensorVec.add()
tensorVec.deviceId = 0
tensorVec.memType = 0
for i in tensors.shape:
tensorVec.tensorShape.append(i)
tensorVec.dataStr = dataInput.data
tensorVec.tensorDataSize = len(array_bytes)
KEY_2 = "appsrc0".encode('utf-8')
protobufVec = InProtobufVector()
protobuf = MxProtobufIn()
protobuf.key = KEY_2
protobuf.type = b'MxTools.MxpiTensorPackageList'
protobuf.protobuf = tensorPackageList.SerializeToString()
protobufVec.push_back(protobuf)
ret = streamManagerApi.SendProtobuf(
STREAM_NAME_2, IN_PLUGIN_ID, protobufVec)
if ret != 0:
print("Failed to send data to stream.")
exit()
keyVec = StringVector()
KEYS_2 = b"mxpi_tensorinfer0"
for key in KEYS_2:
keyVec.push_back(KEYS_2)
infer_result = streamManagerApi.GetProtobuf(STREAM_NAME_2, 0, keyVec)
tensorList = MxpiDataType.MxpiTensorPackageList()
tensorList.ParseFromString(infer_result[0].messageBuf)
keypoint_outputs = np.frombuffer(
tensorList.tensorPackageVec[0].tensorVec[0].dataStr,
dtype=np.float16)
keypoint_outputs = keypoint_outputs.reshape((1, 80, 96, 72))
cls_outputs = np.frombuffer(
tensorList.tensorPackageVec[0].tensorVec[1].dataStr,
dtype=np.float16)
cls_outputs = cls_outputs.reshape((1, 23))
# post_process
if isinstance(keypoint_outputs, list):
kp_output = keypoint_outputs[-1]
else:
kp_output = keypoint_outputs
preds, maxvals = get_final_preds(
kp_output,
np.asarray([center]),
np.asarray([scale]))
filters = np.argmax(cls_outputs, axis=1)
mask = mask_generate(filters, 80)
preds_mask = preds * mask
image_bgr = cv2.imread(FILE_PATH)
if len(preds_mask) >= 1:
for kpt in preds_mask:
draw_pose(kpt, image_bgr) # draw the poses
SAVEPATH = 'output.jpg'
cv2.imwrite(SAVEPATH, image_bgr)
print('the result image has been saved as {}'.format(SAVEPATH))
# destroy streams
streamManagerApi.DestroyAllStreams()