Merge pull request #602 from sam-dieck/master

Add multi-thread frame processing to daemon.
This commit is contained in:
Matthew Hill
2017-12-13 11:04:35 -05:00
committed by GitHub
6 changed files with 154 additions and 87 deletions

4
.gitignore vendored
View File

@@ -29,6 +29,10 @@ openalpr-*.asc
openalpr-*.sig openalpr-*.sig
openalpr-*.tar.gz openalpr-*.tar.gz
*.orig *.orig
.DS_Store
# vim editor files
*.swp
# Visual Studio files # Visual Studio files
src.sln src.sln

View File

@@ -16,6 +16,9 @@ country = us
; stream = http://127.0.0.1/example_second_stream.mjpeg ; stream = http://127.0.0.1/example_second_stream.mjpeg
; stream = webcam ; stream = webcam
; Number of threads to analyze frames.
analysis_threads = 4
; topn is the number of possible plate character variations to report ; topn is the number of possible plate character variations to report
topn = 10 topn = 10

View File

@@ -7,6 +7,7 @@
#include "daemon/beanstalk.hpp" #include "daemon/beanstalk.hpp"
#include "video/logging_videobuffer.h" #include "video/logging_videobuffer.h"
#include "daemon/daemonconfig.h" #include "daemon/daemonconfig.h"
#include "inc/safequeue.h"
#include "tclap/CmdLine.h" #include "tclap/CmdLine.h"
#include "alpr.h" #include "alpr.h"
@@ -23,7 +24,10 @@
using namespace alpr; using namespace alpr;
// prototypes // Variables
SafeQueue<cv::Mat> framesQueue;
// Prototypes
void streamRecognitionThread(void* arg); void streamRecognitionThread(void* arg);
bool writeToQueue(std::string jsonResult); bool writeToQueue(std::string jsonResult);
bool uploadPost(CURL* curl, std::string url, std::string data); bool uploadPost(CURL* curl, std::string url, std::string data);
@@ -45,6 +49,7 @@ struct CaptureThreadData
std::string stream_url; std::string stream_url;
std::string site_id; std::string site_id;
int camera_id; int camera_id;
int analysis_threads;
bool clock_on; bool clock_on;
@@ -200,6 +205,7 @@ int main( int argc, const char** argv )
tdata->country_code = daemon_config.country; tdata->country_code = daemon_config.country;
tdata->company_id = daemon_config.company_id; tdata->company_id = daemon_config.company_id;
tdata->site_id = daemon_config.site_id; tdata->site_id = daemon_config.site_id;
tdata->analysis_threads = daemon_config.analysis_threads;
tdata->top_n = daemon_config.topn; tdata->top_n = daemon_config.topn;
tdata->pattern = daemon_config.pattern; tdata->pattern = daemon_config.pattern;
tdata->clock_on = clockOn; tdata->clock_on = clockOn;
@@ -213,6 +219,7 @@ int main( int argc, const char** argv )
UploadThreadData* udata = new UploadThreadData(); UploadThreadData* udata = new UploadThreadData();
udata->upload_url = daemon_config.upload_url; udata->upload_url = daemon_config.upload_url;
tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata ); tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata );
threads.push_back(thread_upload); threads.push_back(thread_upload);
} }
@@ -231,82 +238,56 @@ int main( int argc, const char** argv )
} }
void streamRecognitionThread(void* arg) void processingThread(void* arg)
{ {
CaptureThreadData* tdata = (CaptureThreadData*) arg; CaptureThreadData* tdata = (CaptureThreadData*) arg;
LOG4CPLUS_INFO(logger, "country: " << tdata->country_code << " -- config file: " << tdata->config_file );
LOG4CPLUS_INFO(logger, "pattern: " << tdata->pattern);
LOG4CPLUS_INFO(logger, "Stream " << tdata->camera_id << ": " << tdata->stream_url);
Alpr alpr(tdata->country_code, tdata->config_file); Alpr alpr(tdata->country_code, tdata->config_file);
alpr.setTopN(tdata->top_n); alpr.setTopN(tdata->top_n);
alpr.setDefaultRegion(tdata->pattern); alpr.setDefaultRegion(tdata->pattern);
while (daemon_active) {
int framenum = 0; // Wait for a new frame
cv::Mat frame = framesQueue.pop();
LoggingVideoBuffer videoBuffer(logger);
videoBuffer.connect(tdata->stream_url, 5);
cv::Mat latestFrame;
std::vector<uchar> buffer;
LOG4CPLUS_INFO(logger, "Starting camera " << tdata->camera_id);
while (daemon_active)
{
std::vector<cv::Rect> regionsOfInterest;
int response = videoBuffer.getLatestFrame(&latestFrame, regionsOfInterest);
if (response != -1)
{
// Process new frame
timespec startTime; timespec startTime;
getTimeMonotonic(&startTime); getTimeMonotonic(&startTime);
std::vector<AlprRegionOfInterest> regionsOfInterest; std::vector<AlprRegionOfInterest> regionsOfInterest;
regionsOfInterest.push_back(AlprRegionOfInterest(0,0, latestFrame.cols, latestFrame.rows)); regionsOfInterest.push_back(AlprRegionOfInterest(0,0, frame.cols, frame.rows));
AlprResults results = alpr.recognize(latestFrame.data, latestFrame.elemSize(), latestFrame.cols, latestFrame.rows, regionsOfInterest); AlprResults results = alpr.recognize(frame.data, frame.elemSize(), frame.cols, frame.rows, regionsOfInterest);
timespec endTime; timespec endTime;
getTimeMonotonic(&endTime); getTimeMonotonic(&endTime);
double totalProcessingTime = diffclock(startTime, endTime); double totalProcessingTime = diffclock(startTime, endTime);
if (tdata->clock_on) if (tdata->clock_on) {
{
LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms."); LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms.");
} }
if (results.plates.size() > 0) if (results.plates.size() > 0) {
{
std::stringstream uuid_ss; std::stringstream uuid_ss;
uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs(); uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs();
std::string uuid = uuid_ss.str(); std::string uuid = uuid_ss.str();
// Save the image to disk (using the UUID) // Save the image to disk (using the UUID)
if (tdata->output_images) if (tdata->output_images) {
{
std::stringstream ss; std::stringstream ss;
ss << tdata->output_image_folder << "/" << uuid << ".jpg"; ss << tdata->output_image_folder << "/" << uuid << ".jpg";
cv::imwrite(ss.str(), frame);
cv::imwrite(ss.str(), latestFrame);
} }
// Update the JSON content to include UUID and camera ID // Update the JSON content to include UUID and camera ID
std::string json = alpr.toJson(results); std::string json = alpr.toJson(results);
cJSON *root = cJSON_Parse(json.c_str()); cJSON *root = cJSON_Parse(json.c_str());
cJSON_AddStringToObject(root, "uuid", uuid.c_str()); cJSON_AddStringToObject(root, "uuid", uuid.c_str());
cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id); cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id);
cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str()); cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str());
cJSON_AddNumberToObject(root, "img_width", latestFrame.cols); cJSON_AddNumberToObject(root, "img_width", frame.cols);
cJSON_AddNumberToObject(root, "img_height", latestFrame.rows); cJSON_AddNumberToObject(root, "img_height", frame.rows);
// Add the company ID to the output if configured // Add the company ID to the output if configured
if (tdata->company_id.length() > 0) if (tdata->company_id.length() > 0)
@@ -328,17 +309,54 @@ void streamRecognitionThread(void* arg)
writeToQueue(response); writeToQueue(response);
} }
usleep(10000);
}
}
void streamRecognitionThread(void* arg)
{
CaptureThreadData* tdata = (CaptureThreadData*) arg;
LOG4CPLUS_INFO(logger, "country: " << tdata->country_code << " -- config file: " << tdata->config_file );
LOG4CPLUS_INFO(logger, "pattern: " << tdata->pattern);
LOG4CPLUS_INFO(logger, "Stream " << tdata->camera_id << ": " << tdata->stream_url);
/* Create processing threads */
const int num_threads = tdata->analysis_threads;
tthread::thread* threads[num_threads];
for (int i = 0; i < num_threads; i++) {
LOG4CPLUS_INFO(logger, "Spawning Thread " << i );
tthread::thread* t = new tthread::thread(processingThread, (void*) tdata);
threads[i] = t;
}
cv::Mat frame;
LoggingVideoBuffer videoBuffer(logger);
videoBuffer.connect(tdata->stream_url, 5);
LOG4CPLUS_INFO(logger, "Starting camera " << tdata->camera_id);
while (daemon_active)
{
std::vector<cv::Rect> regionsOfInterest;
int response = videoBuffer.getLatestFrame(&frame, regionsOfInterest);
if (response != -1) {
if (framesQueue.empty()) {
framesQueue.push(frame.clone());
}
} }
usleep(10000); usleep(10000);
} }
videoBuffer.disconnect(); videoBuffer.disconnect();
LOG4CPLUS_INFO(logger, "Video processing ended"); LOG4CPLUS_INFO(logger, "Video processing ended");
delete tdata; delete tdata;
for (int i = 0; i < num_threads; i++) {
delete threads[i];
}
} }

View File

@@ -46,6 +46,7 @@ DaemonConfig::DaemonConfig(std::string config_file, std::string config_defaults_
country = getString(&ini, &defaultIni, "daemon", "country", "us"); country = getString(&ini, &defaultIni, "daemon", "country", "us");
topn = getInt(&ini, &defaultIni, "daemon", "topn", 20); topn = getInt(&ini, &defaultIni, "daemon", "topn", 20);
analysis_threads = getInt(&ini, &defaultIni, "daemon", "analysis_threads", 1);
storePlates = getBoolean(&ini, &defaultIni, "daemon", "store_plates", false); storePlates = getBoolean(&ini, &defaultIni, "daemon", "store_plates", false);
imageFolder = getString(&ini, &defaultIni, "daemon", "store_plates_location", "/tmp/"); imageFolder = getString(&ini, &defaultIni, "daemon", "store_plates_location", "/tmp/");

View File

@@ -16,6 +16,7 @@ public:
std::string country; std::string country;
int topn; int topn;
int analysis_threads;
bool storePlates; bool storePlates;
std::string imageFolder; std::string imageFolder;
bool uploadData; bool uploadData;

40
src/inc/safequeue.h Normal file
View File

@@ -0,0 +1,40 @@
#ifndef SAFE_QUEUE_H_
#define SAFE_QUEUE_H_
#include <queue>
#include "support/tinythread.h"
template <typename T>
class SafeQueue
{
public:
T pop()
{
tthread::lock_guard<tthread::mutex> mlock(_mutex);
while (_queue.empty()) {
_cond.wait(_mutex);
}
T val = _queue.front();
_queue.pop();
return val;
}
void push(const T& item)
{
tthread::lock_guard<tthread::mutex> mlock(_mutex);
_queue.push(item);
_cond.notify_one();
}
bool empty()
{
return _queue.empty();
}
private:
std::queue<T> _queue;
tthread::mutex _mutex;
tthread::condition_variable _cond;
};
#endif