Files
li-zhenyun b2e0e056e8 fixed
2022-10-24 11:42:20 +08:00

118 lines
4.0 KiB
Python

#!/usr/bin/env python
# coding=utf-8
# Copyright(C) 2022. Huawei Technologies Co.,Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import stat
import time
import cv2
from cv2 import getTickCount, getTickFrequency
from PIL import Image
import MxpiDataType_pb2 as MxpiDataType
from StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
from visualize import plot_one_box
if __name__ == '__main__':
steammanager_api = StreamManagerApi()
# init stream manager
ret = steammanager_api.InitManager()
if ret != 0:
print("Failed to init Stream manager, ret=%s" % str(ret))
exit()
# create streams by pipeline config file
MODES = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open("../pipeline/detect.pipeline", os.O_RDONLY, MODES), 'rb') as f:
pipeline_str = f.read()
ret = steammanager_api.CreateMultipleStreams(pipeline_str)
if ret != 0:
print("Failed to create Stream, ret=%s" % str(ret))
exit()
dataInput = MxDataInput()
# It is best to use absolute path
FILENAME = "../data/0051.jpg"
RESULTFILE = "../data/output.jpg"
if os.path.exists(FILENAME) != 1:
print("The test image does not exist. Exit.")
exit()
try :
image = Image.open(FILENAME)
except Exception as e :
print("the image is not JPG format")
exit()
if image.format != "JPEG" and image.format != "JPG":
print("the image is not JPG format")
exit()
with os.fdopen(os.open(FILENAME, os.O_RDONLY, MODES), 'rb') as f:
dataInput.data = f.read()
STEAMNAME = b'detection'
INPLUGINID = 0
t1 = time.time()
uniqueId = steammanager_api.SendData(STEAMNAME, INPLUGINID, dataInput)
if uniqueId < 0:
print("Failed to send data to stream.")
exit()
keys = [ b"mxpi_objectpostprocessor0"]
keyVec = StringVector()
for key in keys:
keyVec.push_back(key)
result = steammanager_api.GetProtobuf(STEAMNAME, 0, keyVec)
if result.size() == 0:
print("No object detected")
img = cv2.imread(FILENAME)
cv2.imwrite(RESULTFILE, img)
exit()
if result[0].errorCode != 0:
print("GetProtobuf error. errorCode=%d, errorMsg=%s" % (
result[0].errorCode, result[0].data.decode()))
exit()
# process data output from mxpi_objectpostprocessor plugin
object_list = MxpiDataType.MxpiObjectList()
object_list.ParseFromString(result[0].messageBuf)
bounding_boxes = []
for obj in object_list.objectVec:
print(obj)
box = {'x0': int(obj.x0),
'x1': int(obj.x1),
'y0': int(obj.y0),
'y1': int(obj.y1),
'class': int(obj.classVec[0].classId),
'class_name': obj.classVec[0].className,
'confidence': round(obj.classVec[0].confidence, 4)}
bounding_boxes.append(box)
print("fps:", 1/(time.time()-t1))
img = cv2.imread(FILENAME)
# draw each bounding box on the original image
for box in bounding_boxes:
class_id = box.get('class')
class_name = box.get('class_name')
score = box.get('confidence')
plot_one_box(img,
[box.get('x0'),
box.get('y0'),
box.get('x1'),
box.get('y1')],
cls_id=class_id,
label=class_name,
box_score=score)
cv2.imwrite(RESULTFILE, img)
# destroy streams
steammanager_api.DestroyAllStreams()