mirror of
https://github.com/Ascend/ascend_community_projects.git
synced 2025-09-27 04:05:54 +08:00
118 lines
4.0 KiB
Python
118 lines
4.0 KiB
Python
#!/usr/bin/env python
|
|
# coding=utf-8
|
|
|
|
# Copyright(C) 2022. Huawei Technologies Co.,Ltd. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import stat
|
|
import time
|
|
import cv2
|
|
from cv2 import getTickCount, getTickFrequency
|
|
from PIL import Image
|
|
import MxpiDataType_pb2 as MxpiDataType
|
|
from StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
|
|
|
|
from visualize import plot_one_box
|
|
|
|
if __name__ == '__main__':
|
|
steammanager_api = StreamManagerApi()
|
|
# init stream manager
|
|
ret = steammanager_api.InitManager()
|
|
if ret != 0:
|
|
print("Failed to init Stream manager, ret=%s" % str(ret))
|
|
exit()
|
|
|
|
# create streams by pipeline config file
|
|
MODES = stat.S_IWUSR | stat.S_IRUSR
|
|
with os.fdopen(os.open("../pipeline/detect.pipeline", os.O_RDONLY, MODES), 'rb') as f:
|
|
pipeline_str = f.read()
|
|
ret = steammanager_api.CreateMultipleStreams(pipeline_str)
|
|
if ret != 0:
|
|
print("Failed to create Stream, ret=%s" % str(ret))
|
|
exit()
|
|
dataInput = MxDataInput()
|
|
# It is best to use absolute path
|
|
FILENAME = "../data/0051.jpg"
|
|
RESULTFILE = "../data/output.jpg"
|
|
|
|
if os.path.exists(FILENAME) != 1:
|
|
print("The test image does not exist. Exit.")
|
|
exit()
|
|
try :
|
|
image = Image.open(FILENAME)
|
|
except Exception as e :
|
|
print("the image is not JPG format")
|
|
exit()
|
|
if image.format != "JPEG" and image.format != "JPG":
|
|
print("the image is not JPG format")
|
|
exit()
|
|
|
|
with os.fdopen(os.open(FILENAME, os.O_RDONLY, MODES), 'rb') as f:
|
|
dataInput.data = f.read()
|
|
STEAMNAME = b'detection'
|
|
INPLUGINID = 0
|
|
t1 = time.time()
|
|
uniqueId = steammanager_api.SendData(STEAMNAME, INPLUGINID, dataInput)
|
|
if uniqueId < 0:
|
|
print("Failed to send data to stream.")
|
|
exit()
|
|
keys = [ b"mxpi_objectpostprocessor0"]
|
|
keyVec = StringVector()
|
|
for key in keys:
|
|
keyVec.push_back(key)
|
|
result = steammanager_api.GetProtobuf(STEAMNAME, 0, keyVec)
|
|
if result.size() == 0:
|
|
print("No object detected")
|
|
img = cv2.imread(FILENAME)
|
|
cv2.imwrite(RESULTFILE, img)
|
|
exit()
|
|
if result[0].errorCode != 0:
|
|
print("GetProtobuf error. errorCode=%d, errorMsg=%s" % (
|
|
result[0].errorCode, result[0].data.decode()))
|
|
exit()
|
|
# process data output from mxpi_objectpostprocessor plugin
|
|
object_list = MxpiDataType.MxpiObjectList()
|
|
object_list.ParseFromString(result[0].messageBuf)
|
|
bounding_boxes = []
|
|
for obj in object_list.objectVec:
|
|
print(obj)
|
|
box = {'x0': int(obj.x0),
|
|
'x1': int(obj.x1),
|
|
'y0': int(obj.y0),
|
|
'y1': int(obj.y1),
|
|
'class': int(obj.classVec[0].classId),
|
|
'class_name': obj.classVec[0].className,
|
|
'confidence': round(obj.classVec[0].confidence, 4)}
|
|
bounding_boxes.append(box)
|
|
print("fps:", 1/(time.time()-t1))
|
|
img = cv2.imread(FILENAME)
|
|
# draw each bounding box on the original image
|
|
for box in bounding_boxes:
|
|
class_id = box.get('class')
|
|
class_name = box.get('class_name')
|
|
score = box.get('confidence')
|
|
plot_one_box(img,
|
|
[box.get('x0'),
|
|
box.get('y0'),
|
|
box.get('x1'),
|
|
box.get('y1')],
|
|
cls_id=class_id,
|
|
label=class_name,
|
|
box_score=score)
|
|
cv2.imwrite(RESULTFILE, img)
|
|
# destroy streams
|
|
steammanager_api.DestroyAllStreams()
|
|
|