Files
li-zhenyun 9041a25a41 Yolact
2022-11-15 15:08:41 +08:00

367 lines
18 KiB
Python

#!/usr/bin/env python
# coding=utf-8
# Copyright(C) 2022. Huawei Technologies Co.,Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import os
import os.path as osp
import colorsys
import shutil
import cv2
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from utils.utils_bbox import BBoxUtility
from utils.anchors import get_anchors
from utils.utils_map import MakeJson, prep_metrics
from utils.utils import cvtcolor, resize_image, get_classes, get_coco_label_map, preprocess_input
import numpy as np
import MxpiDataType_pb2 as MxpiDataType
from PIL import Image
from StreamManagerApi import StreamManagerApi, StringVector, InProtobufVector, MxProtobufIn
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
def parse_args():
parser = argparse.ArgumentParser(description='YOLACT Inferring sdk')
# Datasets
parser.add_argument('--image', default=None, type=str,
help='An input folder of images and output folder to save detected images.'
'Should be in the format input->output.')
parser.add_argument('--PL_PATH', default='./model_data/yolact.pipeline', type=str,
help='pipeline path')
parser.add_argument('--display_text', default=True, type=str2bool,
help='Whether or not to display text (class [score])')
parser.add_argument('--display_bboxes', default=True, type=str2bool,
help='Whether or not to display bboxes around masks')
parser.add_argument('--fast_nms', default=True, type=str2bool,
help='Whether to use a faster, but not entirely correct version of NMS.')
parser.add_argument('--display_scores', default=True, type=str2bool,
help='Whether or not to display scores in addition to classes')
parser.add_argument('--trained_model',
default='weights/ssd300_mAP_77.43_v2.pth', type=str,
help='Trained state_dict file path to open. If "interrupt", this will open the interrupt file.')
parser.add_argument('--display_masks', default=True, type=str2bool,
help='Whether or not to display masks over bounding boxes')
parser.add_argument('--display_lincomb', default=False, type=str2bool,
help='If the config uses lincomb masks, output a visualization of how those masks are created.')
parser.add_argument('--score_threshold', default=0.3, type=float,
help='Detections with a score under this threshold will not be considered.'
'This currently only works in display mode.')
parser.add_argument('--top_k', default=100, type=int,
help='Further restrict the number of predictions to parse')
parser.add_argument('--ap_data_file', default='results/ap_data.pkl', type=str,
help='In quantitative mode, the file to save detections before calculating mAP.')
parser.add_argument('--confidence', default=0.05, type=float,
help='confidence of threshold')
parser.add_argument('--nms_iou', default=0.5, type=float,
help='nms_iou of threshold')
parser.add_argument('--traditional_nms', default=False, type=str2bool,
help='traditional_nms')
parser.add_argument('--classes_path', default='./model_data/coco_classes.txt', type=str,
help='classes path')
parser.set_defaults(no_bar=False, display=False, resume=False, output_coco_json=False,
output_web_json=False, shuffle=False,
benchmark=False, no_sort=False, no_hash=False, mask_proto_debug=False,
crop=True, detect=False, display_fps=False,
emulate_playback=False)
# pca config
par_args = parser.parse_args()
return par_args
iou_thresholds = [x / 100 for x in range(50, 100, 5)]
def send_source_data(appsrc_id, tensor, stream_name, stream_manager):
tensor_package_list = MxpiDataType.MxpiTensorPackageList()
for i in range(tensor.shape[0]):
da = np.expand_dims(tensor[i, :], 0)
tensor_pack = tensor_package_list.tensorPackageVec.add()
ten_vec = tensor_pack.tensorVec.add()
ten_vec.deviceId = 0
ten_vec.memType = 0
ten_vec.tensorShape.extend(da.shape)
ten_vec.dataStr = da.tobytes()
ten_vec.tensorDataSize = da.shape[0]
keys = "appsrc{}".format(appsrc_id).encode('utf-8')
protobu_vec = InProtobufVector()
protobu = MxProtobufIn()
protobu.key = keys
protobu.type = b'MxTools.MxpiTensorPackageList'
protobu.protobuf = tensor_package_list.SerializeToString()
protobu_vec.push_back(protobu)
ret = stream_manager.SendProtobuf(stream_name, appsrc_id, protobu_vec)
if ret < 0:
print("Failed to send data to stream.")
return False
print('succes')
return True
def evalimage(stream_manager_api, path:str, save_path:str=None):
if not os.path.exists(path):
print("Picture doesn't exist, please try again")
exit()
if not os.path.splitext(path)[-1] == '.jpg':
print("The image should be in .jpg format")
exit()
image = Image.open(path)
image_shape = np.array(np.shape(image)[0:2])
image = cvtcolor(image)
image_origin = np.array(image, np.uint8)
image_data = resize_image(image, (544, 544))
batch = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, np.float32)), (2, 0, 1)), 0)
batch = batch.astype(np.float32)
stream_name = b'im_yolact'
in_plugin_id = 0
if not send_source_data(0, batch, stream_name, stream_manager_api):
return None
keys = [b"mxpi_tensorinfer0"]
key_vec = StringVector()
for key in keys:
key_vec.push_back(key)
infer_results = stream_manager_api.GetProtobuf(stream_name, in_plugin_id, key_vec)
if infer_results.size() == 0 or infer_results.size() == 0:
print("infer_result is null")
exit()
if infer_results[0].errorCode != 0:
print("GetProtobuf error. errorCode=%d, errorMsg=%s" % (
infer_results[0].errorCode, infer_results[0].data.decode()))
exit()
result_list = MxpiDataType.MxpiTensorPackageList()
result_list.ParseFromString(infer_results[0].messageBuf)
pred_boxes = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[0].dataStr, dtype=np.float32)])
pred_boxes = pred_boxes.reshape(1, 18525, 4)
pred_classes = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[1].dataStr, dtype=np.float32)])
pred_classes = pred_classes.reshape(1, 18525, 81)
pred_masks = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[2].dataStr, dtype=np.float32)])
pred_masks = pred_masks.reshape(1, 18525, 32)
pred_proto = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[3].dataStr, dtype=np.float32)])
pred_proto = pred_proto.reshape(1, 136, 136, 32)
detect = BBoxUtility()
outputs = tuple([pred_boxes, pred_classes, pred_masks, pred_proto])
#----------------------------------------------------------------------#
# 根据每个像素点所属的实例和是否满足门限需求,判断每个像素点的种类
#----------------------------------------------------------------------#
class_names, num_classes = get_classes(args.classes_path)
num_classes += 1
anchors = get_anchors([544, 544], [24, 48, 96, 192, 384])
#---------------------------------------------------#
# 画框设置不同的颜色
#---------------------------------------------------#
if num_classes <= 81:
colors = np.array([[0, 0, 0], [244, 67, 54], [233, 30, 99], [156, 39, 176], [103, 58, 183],
[100, 30, 60], [63, 81, 181], [33, 150, 243], [3, 169, 244], [0, 188, 212],
[20, 55, 200], [0, 150, 136], [76, 175, 80], [139, 195, 74], [205, 220, 57],
[70, 25, 100], [255, 235, 59], [255, 193, 7], [255, 152, 0], [255, 87, 34],
[90, 155, 50], [121, 85, 72], [158, 158, 158], [96, 125, 139], [15, 67, 34],
[98, 55, 20], [21, 82, 172], [58, 128, 255], [196, 125, 39], [75, 27, 134],
[90, 125, 120], [121, 82, 7], [158, 58, 8], [96, 25, 9], [115, 7, 234],
[8, 155, 220], [221, 25, 72], [188, 58, 158], [56, 175, 19], [215, 67, 64],
[198, 75, 20], [62, 185, 22], [108, 70, 58], [160, 225, 39], [95, 60, 144],
[78, 155, 120], [101, 25, 142], [48, 198, 28], [96, 225, 200], [150, 167, 134],
[18, 185, 90], [21, 145, 172], [98, 68, 78], [196, 105, 19], [215, 67, 84],
[130, 115, 170], [255, 0, 255], [255, 255, 0], [196, 185, 10], [95, 167, 234],
[18, 25, 190], [0, 255, 255], [255, 0, 0], [0, 255, 0], [0, 0, 255],
[155, 0, 0], [0, 155, 0], [0, 0, 155], [46, 22, 130], [255, 0, 155],
[155, 0, 255], [255, 155, 0], [155, 255, 0], [0, 155, 255], [0, 255, 155],
[18, 5, 40], [120, 120, 255], [255, 58, 30], [60, 45, 60], [75, 27, 244], [128, 25, 70]],
dtype='uint8')
else:
hsv_tuples = [(x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
confidence = 0.5
nms_iou = 0.3
results = detect.decode_nms(outputs, anchors, confidence, nms_iou, image_shape, args.traditional_nms)
if results[0] is None:
return image
box_thre, class_thre, class_ids, masks_arg, masks_sigmoid = [x for x in results]
masks_class = masks_sigmoid * (class_ids[None, None, :] + 1)
masks_class = np.reshape(masks_class, [-1, np.shape(masks_sigmoid)[-1]])
masks_class = np.reshape(masks_class[np.arange(np.shape(masks_class)[0]),
np.reshape(masks_arg, [-1])], [image_shape[0], image_shape[1]])
#---------------------------------------------------------#
# 设置字体与边框厚度
#---------------------------------------------------------#
scale = 0.6
thickness = int(max((image.size[0] + image.size[1]) // np.mean([544, 544]), 1))
font = cv2.FONT_HERSHEY_DUPLEX
color_masks = colors[masks_class].astype('uint8')
image_fused = cv2.addWeighted(color_masks, 0.4, image_origin, 0.6, gamma=0)
for i in range(np.shape(class_ids)[0]):
left, top, right, bottom = np.array(box_thre[i, :], np.int32)
#---------------------------------------------------------#
# 获取颜色并绘制预测框
#---------------------------------------------------------#
color = colors[class_ids[i] + 1].tolist()
cv2.rectangle(image_fused, (left, top), (right, bottom), color, thickness)
#---------------------------------------------------------#
# 获得这个框的种类并写在图片上
#---------------------------------------------------------#
class_name = class_names[class_ids[i]]
text_str = f'{class_name}: {class_thre[i]:.2f}'
text_w, text_h = cv2.getTextSize(text_str, font, scale, 1)[0]
cv2.rectangle(image_fused, (left, top), (left + text_w, top + text_h + 5), color, -1)
cv2.putText(image_fused, text_str, (left, top + 15), font, scale, (255, 255, 255), 1, cv2.LINE_AA)
image = Image.fromarray(np.uint8(image_fused))
image.save("img.jpg")
return None
def val(val_args):
if os.path.exists('./map_out'):
shutil.rmtree('./map_out')
# init streams
str_man_api = StreamManagerApi()
ret = str_man_api.InitManager()
if ret != 0:
print("Failed to init Stream manager, ret=%s" % str(ret))
exit()
# create streams by pipeline config file
with open(val_args.PL_PATH, 'rb') as pl:
pipeline_str = pl.read()
ret = str_man_api.CreateMultipleStreams(pipeline_str)
annotationfile = './data/coco/annotations/instances_val2017.json'
coco_gt = COCO(annotationfile)
imagefolder = './data/coco/images'
image_ids = list(coco_gt.imgToAnns.keys())
#-------------------------------------------------------#
# 获得测试用的图片路径和标签
# 默认指向根目录下面的datasets/coco文件夹
#-------------------------------------------------------#
json_path = "./data/coco/annotations/instances_val2017.json"
map_out_path = 'map_out'
test = COCO(json_path)
class_names, _ = get_classes(val_args.classes_path)
coco_label_map = get_coco_label_map(test, class_names)
if val_args.image is not None:
if ':' in val_args.image:
inp, out = val_args.image.split(':')
evalimage(str_man_api, inp, out)
else:
evalimage(str_man_api, val_args.image)
return
if not osp.exists(map_out_path):
os.makedirs(map_out_path)
print("Get predict result.")
m_json = MakeJson(map_out_path, coco_label_map)
for image_idx, image_id in enumerate(image_ids):
print('image_idx = %d image_id = %d.' % (image_idx, image_id))
image_info = coco_gt.loadImgs(image_id)[0]
image_path = os.path.join(imagefolder, image_info['file_name'])
image = Image.open(image_path)
image_shape = np.array(np.shape(image)[0:2])
image = cvtcolor(image)
image_data = resize_image(image, (544, 544))
image_data = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, np.float32)), (2, 0, 1)), 0)
print('Detect image: ', image_idx, ': ', image_info['file_name'],
', image id: ', image_id)
if os.path.exists(image_path) != 1:
print("The test image does not exist. Exit.")
exit()
batch = image_data
batch = batch.astype(np.float32)
stre_name = b'im_yolact'
in_plugin_id = 0
if not send_source_data(0, batch, stre_name, str_man_api):
return
kes = [b"mxpi_tensorinfer0"]
ke_vec = StringVector()
for key in kes:
ke_vec.push_back(key)
infer_results = str_man_api.GetProtobuf(stre_name, in_plugin_id, ke_vec)
if infer_results.size() == 0 or infer_results.size() == 0:
print("infer_result is null")
exit()
if infer_results[0].errorCode != 0:
print("GetProtobuf error. errorCode=%d, errorMsg=%s" % (
infer_results[0].errorCode, infer_results[0].data.decode()))
exit()
result_list = MxpiDataType.MxpiTensorPackageList()
result_list.ParseFromString(infer_results[0].messageBuf)
pred_boxes = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[0].dataStr, dtype=np.float32)])
pred_boxes = pred_boxes.reshape(1, 18525, 4)
pred_classes = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[1].dataStr, dtype=np.float32)])
pred_classes = pred_classes.reshape(1, 18525, 81)
pred_masks = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[2].dataStr, dtype=np.float32)])
pred_masks = pred_masks.reshape(1, 18525, 32)
pred_proto = np.array(
[np.frombuffer(result_list.tensorPackageVec[0].tensorVec[3].dataStr, dtype=np.float32)])
pred_proto = pred_proto.reshape(1, 136, 136, 32)
anchor = get_anchors([544, 544], [24, 48, 96, 192, 384])
detect = BBoxUtility()
outputs = tuple([pred_boxes, pred_classes, pred_masks, pred_proto])
results = detect.decode_nms(outputs, anchor, val_args.confidence, val_args.nms_iou,
image_shape, val_args.traditional_nms)
if results[0] is None:
continue
box_thre, class_thre, class_ids, masks_arg, masks_sigmoid = [x for x in results]
if box_thre is None:
continue
prep_metrics(box_thre, class_thre, class_ids, masks_sigmoid, image_id, m_json)
m_json.dump()
print(f'\nJson files dumped, saved in: \'eval_results/\', start evaluting.')
bbox = test.loadRes(osp.join(map_out_path, "bbox_detections.json"))
mask = test.loadRes(osp.join(map_out_path, "mask_detections.json"))
print('\nBBoxes:')
b_eval = COCOeval(test, bbox, 'bbox')
b_eval.evaluate()
b_eval.accumulate()
b_eval.summarize()
print('\nMasks:')
b_eval = COCOeval(test, mask, 'segm')
b_eval.evaluate()
b_eval.accumulate()
b_eval.summarize()
if __name__ == '__main__':
args = parse_args()
if not os.path.exists('results'):
os.makedirs('results')
val(val_args=args)