From b4d3c519ffc2decb3fb2ea6eeace062ae2aa9b6c Mon Sep 17 00:00:00 2001 From: Sam Dieck Date: Sat, 18 Nov 2017 23:09:25 -0600 Subject: [PATCH 1/4] Added multi-threaded frame analysis to daemon --- .gitignore | 4 + src/CMakeLists.txt | 1 + src/daemon.cpp | 191 ++++++++++++++++++++---------------- src/daemon/daemonconfig.cpp | 1 + src/daemon/daemonconfig.h | 1 + src/inc/safequeue.h | 48 +++++++++ 6 files changed, 159 insertions(+), 87 deletions(-) create mode 100644 src/inc/safequeue.h diff --git a/.gitignore b/.gitignore index 1e3f1c8..bcb0675 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,10 @@ openalpr-*.asc openalpr-*.sig openalpr-*.tar.gz *.orig +.DS_Store + +# vim editor files +*.swp # Visual Studio files src.sln diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c1df54e..960ee27 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -130,6 +130,7 @@ ELSE() ENDIF() +set (CMAKE_CXX_STANDARD 11) set(CMAKE_CSS_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall ") if (NOT IOS) ADD_EXECUTABLE( alpr main.cpp ) diff --git a/src/daemon.cpp b/src/daemon.cpp index 42c6206..26a8c2d 100644 --- a/src/daemon.cpp +++ b/src/daemon.cpp @@ -7,6 +7,7 @@ #include "daemon/beanstalk.hpp" #include "video/logging_videobuffer.h" #include "daemon/daemonconfig.h" +#include "inc/safequeue.h" #include "tclap/CmdLine.h" #include "alpr.h" @@ -23,7 +24,10 @@ using namespace alpr; -// prototypes +// Variables +SafeQueue framesQueue; + +// Prototypes void streamRecognitionThread(void* arg); bool writeToQueue(std::string jsonResult); bool uploadPost(CURL* curl, std::string url, std::string data); @@ -45,6 +49,7 @@ struct CaptureThreadData std::string stream_url; std::string site_id; int camera_id; + int analysis_threads; bool clock_on; @@ -200,6 +205,7 @@ int main( int argc, const char** argv ) tdata->country_code = daemon_config.country; tdata->company_id = daemon_config.company_id; tdata->site_id = daemon_config.site_id; + tdata->analysis_threads = daemon_config.analysis_threads; tdata->top_n = daemon_config.topn; tdata->pattern = daemon_config.pattern; tdata->clock_on = clockOn; @@ -209,10 +215,11 @@ int main( int argc, const char** argv ) if (daemon_config.uploadData) { - // Kick off the data upload thread - UploadThreadData* udata = new UploadThreadData(); - udata->upload_url = daemon_config.upload_url; - tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata ); + // Kick off the data upload thread + UploadThreadData* udata = new UploadThreadData(); + udata->upload_url = daemon_config.upload_url; + tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata ); + threads.push_back(thread_upload); } @@ -231,6 +238,81 @@ int main( int argc, const char** argv ) } +void processingThread(void* arg) +{ + CaptureThreadData* tdata = (CaptureThreadData*) arg; + Alpr alpr(tdata->country_code, tdata->config_file); + alpr.setTopN(tdata->top_n); + alpr.setDefaultRegion(tdata->pattern); + + while (daemon_active) { + + // Wait for a new frame + cv::Mat frame = framesQueue.pop(); + + // Process new frame + timespec startTime; + getTimeMonotonic(&startTime); + + std::vector regionsOfInterest; + regionsOfInterest.push_back(AlprRegionOfInterest(0,0, frame.cols, frame.rows)); + + AlprResults results = alpr.recognize(frame.data, frame.elemSize(), frame.cols, frame.rows, regionsOfInterest); + + timespec endTime; + getTimeMonotonic(&endTime); + double totalProcessingTime = diffclock(startTime, endTime); + + if (tdata->clock_on) { + LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms."); + } + + if (results.plates.size() > 0) { + + std::stringstream uuid_ss; + uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs(); + std::string uuid = uuid_ss.str(); + + // Save the image to disk (using the UUID) + if (tdata->output_images) { + std::stringstream ss; + ss << tdata->output_image_folder << "/" << uuid << ".jpg"; + cv::imwrite(ss.str(), frame); + } + + // Update the JSON content to include UUID and camera ID + std::string json = alpr.toJson(results); + cJSON *root = cJSON_Parse(json.c_str()); + cJSON_AddStringToObject(root, "uuid", uuid.c_str()); + cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id); + cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str()); + cJSON_AddNumberToObject(root, "img_width", frame.cols); + cJSON_AddNumberToObject(root, "img_height", frame.rows); + + // Add the company ID to the output if configured + if (tdata->company_id.length() > 0) + cJSON_AddStringToObject(root, "company_id", tdata->company_id.c_str()); + + char *out; + out=cJSON_PrintUnformatted(root); + cJSON_Delete(root); + + std::string response(out); + + free(out); + + // Push the results to the Beanstalk queue + for (int j = 0; j < results.plates.size(); j++) + { + LOG4CPLUS_DEBUG(logger, "Writing plate " << results.plates[j].bestPlate.characters << " (" << uuid << ") to queue."); + } + + writeToQueue(response); + } + } +} + + void streamRecognitionThread(void* arg) { CaptureThreadData* tdata = (CaptureThreadData*) arg; @@ -239,106 +321,41 @@ void streamRecognitionThread(void* arg) LOG4CPLUS_INFO(logger, "pattern: " << tdata->pattern); LOG4CPLUS_INFO(logger, "Stream " << tdata->camera_id << ": " << tdata->stream_url); - Alpr alpr(tdata->country_code, tdata->config_file); - alpr.setTopN(tdata->top_n); - alpr.setDefaultRegion(tdata->pattern); - - - int framenum = 0; + /* Create processing threads */ + const int num_threads = tdata->analysis_threads; + tthread::thread* threads[num_threads]; + + for (int i = 0; i < num_threads; i++) { + LOG4CPLUS_INFO(logger, "Spawning Thread " << i ); + tthread::thread* t = new tthread::thread(processingThread, (void*) tdata); + threads[i] = t; + } + cv::Mat frame; LoggingVideoBuffer videoBuffer(logger); - videoBuffer.connect(tdata->stream_url, 5); - - cv::Mat latestFrame; - - std::vector buffer; - LOG4CPLUS_INFO(logger, "Starting camera " << tdata->camera_id); while (daemon_active) { std::vector regionsOfInterest; - int response = videoBuffer.getLatestFrame(&latestFrame, regionsOfInterest); + int response = videoBuffer.getLatestFrame(&frame, regionsOfInterest); - if (response != -1) - { - - timespec startTime; - getTimeMonotonic(&startTime); - - std::vector regionsOfInterest; - regionsOfInterest.push_back(AlprRegionOfInterest(0,0, latestFrame.cols, latestFrame.rows)); - - AlprResults results = alpr.recognize(latestFrame.data, latestFrame.elemSize(), latestFrame.cols, latestFrame.rows, regionsOfInterest); - - timespec endTime; - getTimeMonotonic(&endTime); - double totalProcessingTime = diffclock(startTime, endTime); - - if (tdata->clock_on) - { - LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms."); - } - - if (results.plates.size() > 0) - { - - std::stringstream uuid_ss; - uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs(); - std::string uuid = uuid_ss.str(); - - // Save the image to disk (using the UUID) - if (tdata->output_images) - { - std::stringstream ss; - ss << tdata->output_image_folder << "/" << uuid << ".jpg"; - - cv::imwrite(ss.str(), latestFrame); - } - - // Update the JSON content to include UUID and camera ID - - std::string json = alpr.toJson(results); - - cJSON *root = cJSON_Parse(json.c_str()); - cJSON_AddStringToObject(root, "uuid", uuid.c_str()); - cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id); - cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str()); - cJSON_AddNumberToObject(root, "img_width", latestFrame.cols); - cJSON_AddNumberToObject(root, "img_height", latestFrame.rows); - - // Add the company ID to the output if configured - if (tdata->company_id.length() > 0) - cJSON_AddStringToObject(root, "company_id", tdata->company_id.c_str()); - - char *out; - out=cJSON_PrintUnformatted(root); - cJSON_Delete(root); - - std::string response(out); - - free(out); - - // Push the results to the Beanstalk queue - for (int j = 0; j < results.plates.size(); j++) - { - LOG4CPLUS_DEBUG(logger, "Writing plate " << results.plates[j].bestPlate.characters << " (" << uuid << ") to queue."); - } - - writeToQueue(response); + if (response != -1) { + if (framesQueue.empty()) { + framesQueue.push(frame); } } usleep(10000); } - videoBuffer.disconnect(); - LOG4CPLUS_INFO(logger, "Video processing ended"); - delete tdata; + for (int i = 0; i < num_threads; i++) { + delete threads[i]; + } } diff --git a/src/daemon/daemonconfig.cpp b/src/daemon/daemonconfig.cpp index 0a6b203..c561e29 100644 --- a/src/daemon/daemonconfig.cpp +++ b/src/daemon/daemonconfig.cpp @@ -46,6 +46,7 @@ DaemonConfig::DaemonConfig(std::string config_file, std::string config_defaults_ country = getString(&ini, &defaultIni, "daemon", "country", "us"); topn = getInt(&ini, &defaultIni, "daemon", "topn", 20); + analysis_threads = getInt(&ini, &defaultIni, "daemon", "analysis_threads", 1); storePlates = getBoolean(&ini, &defaultIni, "daemon", "store_plates", false); imageFolder = getString(&ini, &defaultIni, "daemon", "store_plates_location", "/tmp/"); diff --git a/src/daemon/daemonconfig.h b/src/daemon/daemonconfig.h index 5f98ba3..b17b796 100644 --- a/src/daemon/daemonconfig.h +++ b/src/daemon/daemonconfig.h @@ -16,6 +16,7 @@ public: std::string country; int topn; + int analysis_threads; bool storePlates; std::string imageFolder; bool uploadData; diff --git a/src/inc/safequeue.h b/src/inc/safequeue.h new file mode 100644 index 0000000..03e07ec --- /dev/null +++ b/src/inc/safequeue.h @@ -0,0 +1,48 @@ +#ifndef SAFE_QUEUE_H_ +#define SAFE_QUEUE_H_ + +#include +#include +#include +#include + +template +class SafeQueue +{ + public: + T pop() + { + std::unique_lock mlock(_mutex); + while (_queue.empty()) { + _cond.wait(mlock); + } + auto val = _queue.front(); + _queue.pop(); + return val; + } + + void push(const T& item) + { + std::unique_lock mlock(_mutex); + _queue.push(item); + mlock.unlock(); + _cond.notify_one(); + } + + bool empty() + { + return _queue.empty(); + } + + SafeQueue() = default; + // Disable copying and assignments + SafeQueue(const SafeQueue&) = delete; + SafeQueue& operator=(const SafeQueue&) = delete; + + private: + std::queue _queue; + std::mutex _mutex; + std::condition_variable _cond; +}; + +#endif From 52e9316afae51c70e782c933b2bae4ac41f3a5e4 Mon Sep 17 00:00:00 2001 From: Sam Dieck Date: Wed, 29 Nov 2017 01:02:44 -0600 Subject: [PATCH 2/4] Fixed frame memory management bug. Added worker thread sleep. --- src/daemon.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/daemon.cpp b/src/daemon.cpp index 26a8c2d..09aaa16 100644 --- a/src/daemon.cpp +++ b/src/daemon.cpp @@ -309,6 +309,7 @@ void processingThread(void* arg) writeToQueue(response); } + usleep(10000); } } @@ -343,7 +344,7 @@ void streamRecognitionThread(void* arg) if (response != -1) { if (framesQueue.empty()) { - framesQueue.push(frame); + framesQueue.push(frame.clone()); } } From 33c50e34d573abe9f499f59e1e242c922dfc4691 Mon Sep 17 00:00:00 2001 From: Sam Dieck Date: Fri, 1 Dec 2017 00:19:03 -0600 Subject: [PATCH 3/4] Added analysis_threads to default config file --- config/alprd.conf.defaults | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config/alprd.conf.defaults b/config/alprd.conf.defaults index b58c225..0f766bf 100644 --- a/config/alprd.conf.defaults +++ b/config/alprd.conf.defaults @@ -16,6 +16,9 @@ country = us ; stream = http://127.0.0.1/example_second_stream.mjpeg ; stream = webcam +; Number of threads to analyze frames. +analysis_threads = 4 + ; topn is the number of possible plate character variations to report topn = 10 From 2f11e0e1e5c26c9a65b25aba527c8bbb7a72236e Mon Sep 17 00:00:00 2001 From: Sam Dieck Date: Wed, 13 Dec 2017 00:46:53 -0600 Subject: [PATCH 4/4] Changed safequeue implementation to make it compatible with c++98 --- src/CMakeLists.txt | 1 - src/inc/safequeue.h | 22 +++++++--------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 960ee27..c1df54e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -130,7 +130,6 @@ ELSE() ENDIF() -set (CMAKE_CXX_STANDARD 11) set(CMAKE_CSS_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall ") if (NOT IOS) ADD_EXECUTABLE( alpr main.cpp ) diff --git a/src/inc/safequeue.h b/src/inc/safequeue.h index 03e07ec..5e31299 100644 --- a/src/inc/safequeue.h +++ b/src/inc/safequeue.h @@ -2,9 +2,7 @@ #define SAFE_QUEUE_H_ #include -#include -#include -#include +#include "support/tinythread.h" template class SafeQueue @@ -12,20 +10,19 @@ class SafeQueue public: T pop() { - std::unique_lock mlock(_mutex); + tthread::lock_guard mlock(_mutex); while (_queue.empty()) { - _cond.wait(mlock); + _cond.wait(_mutex); } - auto val = _queue.front(); + T val = _queue.front(); _queue.pop(); return val; } void push(const T& item) { - std::unique_lock mlock(_mutex); + tthread::lock_guard mlock(_mutex); _queue.push(item); - mlock.unlock(); _cond.notify_one(); } @@ -34,15 +31,10 @@ class SafeQueue return _queue.empty(); } - SafeQueue() = default; - // Disable copying and assignments - SafeQueue(const SafeQueue&) = delete; - SafeQueue& operator=(const SafeQueue&) = delete; - private: std::queue _queue; - std::mutex _mutex; - std::condition_variable _cond; + tthread::mutex _mutex; + tthread::condition_variable _cond; }; #endif