diff --git a/.gitignore b/.gitignore index 1e3f1c8..bcb0675 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,10 @@ openalpr-*.asc openalpr-*.sig openalpr-*.tar.gz *.orig +.DS_Store + +# vim editor files +*.swp # Visual Studio files src.sln diff --git a/config/alprd.conf.defaults b/config/alprd.conf.defaults index b58c225..0f766bf 100644 --- a/config/alprd.conf.defaults +++ b/config/alprd.conf.defaults @@ -16,6 +16,9 @@ country = us ; stream = http://127.0.0.1/example_second_stream.mjpeg ; stream = webcam +; Number of threads to analyze frames. +analysis_threads = 4 + ; topn is the number of possible plate character variations to report topn = 10 diff --git a/src/daemon.cpp b/src/daemon.cpp index 42c6206..09aaa16 100644 --- a/src/daemon.cpp +++ b/src/daemon.cpp @@ -7,6 +7,7 @@ #include "daemon/beanstalk.hpp" #include "video/logging_videobuffer.h" #include "daemon/daemonconfig.h" +#include "inc/safequeue.h" #include "tclap/CmdLine.h" #include "alpr.h" @@ -23,7 +24,10 @@ using namespace alpr; -// prototypes +// Variables +SafeQueue framesQueue; + +// Prototypes void streamRecognitionThread(void* arg); bool writeToQueue(std::string jsonResult); bool uploadPost(CURL* curl, std::string url, std::string data); @@ -45,6 +49,7 @@ struct CaptureThreadData std::string stream_url; std::string site_id; int camera_id; + int analysis_threads; bool clock_on; @@ -200,6 +205,7 @@ int main( int argc, const char** argv ) tdata->country_code = daemon_config.country; tdata->company_id = daemon_config.company_id; tdata->site_id = daemon_config.site_id; + tdata->analysis_threads = daemon_config.analysis_threads; tdata->top_n = daemon_config.topn; tdata->pattern = daemon_config.pattern; tdata->clock_on = clockOn; @@ -209,10 +215,11 @@ int main( int argc, const char** argv ) if (daemon_config.uploadData) { - // Kick off the data upload thread - UploadThreadData* udata = new UploadThreadData(); - udata->upload_url = daemon_config.upload_url; - tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata ); + // Kick off the data upload thread + UploadThreadData* udata = new UploadThreadData(); + udata->upload_url = daemon_config.upload_url; + tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata ); + threads.push_back(thread_upload); } @@ -231,6 +238,82 @@ int main( int argc, const char** argv ) } +void processingThread(void* arg) +{ + CaptureThreadData* tdata = (CaptureThreadData*) arg; + Alpr alpr(tdata->country_code, tdata->config_file); + alpr.setTopN(tdata->top_n); + alpr.setDefaultRegion(tdata->pattern); + + while (daemon_active) { + + // Wait for a new frame + cv::Mat frame = framesQueue.pop(); + + // Process new frame + timespec startTime; + getTimeMonotonic(&startTime); + + std::vector regionsOfInterest; + regionsOfInterest.push_back(AlprRegionOfInterest(0,0, frame.cols, frame.rows)); + + AlprResults results = alpr.recognize(frame.data, frame.elemSize(), frame.cols, frame.rows, regionsOfInterest); + + timespec endTime; + getTimeMonotonic(&endTime); + double totalProcessingTime = diffclock(startTime, endTime); + + if (tdata->clock_on) { + LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms."); + } + + if (results.plates.size() > 0) { + + std::stringstream uuid_ss; + uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs(); + std::string uuid = uuid_ss.str(); + + // Save the image to disk (using the UUID) + if (tdata->output_images) { + std::stringstream ss; + ss << tdata->output_image_folder << "/" << uuid << ".jpg"; + cv::imwrite(ss.str(), frame); + } + + // Update the JSON content to include UUID and camera ID + std::string json = alpr.toJson(results); + cJSON *root = cJSON_Parse(json.c_str()); + cJSON_AddStringToObject(root, "uuid", uuid.c_str()); + cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id); + cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str()); + cJSON_AddNumberToObject(root, "img_width", frame.cols); + cJSON_AddNumberToObject(root, "img_height", frame.rows); + + // Add the company ID to the output if configured + if (tdata->company_id.length() > 0) + cJSON_AddStringToObject(root, "company_id", tdata->company_id.c_str()); + + char *out; + out=cJSON_PrintUnformatted(root); + cJSON_Delete(root); + + std::string response(out); + + free(out); + + // Push the results to the Beanstalk queue + for (int j = 0; j < results.plates.size(); j++) + { + LOG4CPLUS_DEBUG(logger, "Writing plate " << results.plates[j].bestPlate.characters << " (" << uuid << ") to queue."); + } + + writeToQueue(response); + } + usleep(10000); + } +} + + void streamRecognitionThread(void* arg) { CaptureThreadData* tdata = (CaptureThreadData*) arg; @@ -239,106 +322,41 @@ void streamRecognitionThread(void* arg) LOG4CPLUS_INFO(logger, "pattern: " << tdata->pattern); LOG4CPLUS_INFO(logger, "Stream " << tdata->camera_id << ": " << tdata->stream_url); - Alpr alpr(tdata->country_code, tdata->config_file); - alpr.setTopN(tdata->top_n); - alpr.setDefaultRegion(tdata->pattern); - - - int framenum = 0; + /* Create processing threads */ + const int num_threads = tdata->analysis_threads; + tthread::thread* threads[num_threads]; + + for (int i = 0; i < num_threads; i++) { + LOG4CPLUS_INFO(logger, "Spawning Thread " << i ); + tthread::thread* t = new tthread::thread(processingThread, (void*) tdata); + threads[i] = t; + } + cv::Mat frame; LoggingVideoBuffer videoBuffer(logger); - videoBuffer.connect(tdata->stream_url, 5); - - cv::Mat latestFrame; - - std::vector buffer; - LOG4CPLUS_INFO(logger, "Starting camera " << tdata->camera_id); while (daemon_active) { std::vector regionsOfInterest; - int response = videoBuffer.getLatestFrame(&latestFrame, regionsOfInterest); + int response = videoBuffer.getLatestFrame(&frame, regionsOfInterest); - if (response != -1) - { - - timespec startTime; - getTimeMonotonic(&startTime); - - std::vector regionsOfInterest; - regionsOfInterest.push_back(AlprRegionOfInterest(0,0, latestFrame.cols, latestFrame.rows)); - - AlprResults results = alpr.recognize(latestFrame.data, latestFrame.elemSize(), latestFrame.cols, latestFrame.rows, regionsOfInterest); - - timespec endTime; - getTimeMonotonic(&endTime); - double totalProcessingTime = diffclock(startTime, endTime); - - if (tdata->clock_on) - { - LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms."); - } - - if (results.plates.size() > 0) - { - - std::stringstream uuid_ss; - uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs(); - std::string uuid = uuid_ss.str(); - - // Save the image to disk (using the UUID) - if (tdata->output_images) - { - std::stringstream ss; - ss << tdata->output_image_folder << "/" << uuid << ".jpg"; - - cv::imwrite(ss.str(), latestFrame); - } - - // Update the JSON content to include UUID and camera ID - - std::string json = alpr.toJson(results); - - cJSON *root = cJSON_Parse(json.c_str()); - cJSON_AddStringToObject(root, "uuid", uuid.c_str()); - cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id); - cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str()); - cJSON_AddNumberToObject(root, "img_width", latestFrame.cols); - cJSON_AddNumberToObject(root, "img_height", latestFrame.rows); - - // Add the company ID to the output if configured - if (tdata->company_id.length() > 0) - cJSON_AddStringToObject(root, "company_id", tdata->company_id.c_str()); - - char *out; - out=cJSON_PrintUnformatted(root); - cJSON_Delete(root); - - std::string response(out); - - free(out); - - // Push the results to the Beanstalk queue - for (int j = 0; j < results.plates.size(); j++) - { - LOG4CPLUS_DEBUG(logger, "Writing plate " << results.plates[j].bestPlate.characters << " (" << uuid << ") to queue."); - } - - writeToQueue(response); + if (response != -1) { + if (framesQueue.empty()) { + framesQueue.push(frame.clone()); } } usleep(10000); } - videoBuffer.disconnect(); - LOG4CPLUS_INFO(logger, "Video processing ended"); - delete tdata; + for (int i = 0; i < num_threads; i++) { + delete threads[i]; + } } diff --git a/src/daemon/daemonconfig.cpp b/src/daemon/daemonconfig.cpp index 0a6b203..c561e29 100644 --- a/src/daemon/daemonconfig.cpp +++ b/src/daemon/daemonconfig.cpp @@ -46,6 +46,7 @@ DaemonConfig::DaemonConfig(std::string config_file, std::string config_defaults_ country = getString(&ini, &defaultIni, "daemon", "country", "us"); topn = getInt(&ini, &defaultIni, "daemon", "topn", 20); + analysis_threads = getInt(&ini, &defaultIni, "daemon", "analysis_threads", 1); storePlates = getBoolean(&ini, &defaultIni, "daemon", "store_plates", false); imageFolder = getString(&ini, &defaultIni, "daemon", "store_plates_location", "/tmp/"); diff --git a/src/daemon/daemonconfig.h b/src/daemon/daemonconfig.h index 5f98ba3..b17b796 100644 --- a/src/daemon/daemonconfig.h +++ b/src/daemon/daemonconfig.h @@ -16,6 +16,7 @@ public: std::string country; int topn; + int analysis_threads; bool storePlates; std::string imageFolder; bool uploadData; diff --git a/src/inc/safequeue.h b/src/inc/safequeue.h new file mode 100644 index 0000000..5e31299 --- /dev/null +++ b/src/inc/safequeue.h @@ -0,0 +1,40 @@ +#ifndef SAFE_QUEUE_H_ +#define SAFE_QUEUE_H_ + +#include +#include "support/tinythread.h" + +template +class SafeQueue +{ + public: + T pop() + { + tthread::lock_guard mlock(_mutex); + while (_queue.empty()) { + _cond.wait(_mutex); + } + T val = _queue.front(); + _queue.pop(); + return val; + } + + void push(const T& item) + { + tthread::lock_guard mlock(_mutex); + _queue.push(item); + _cond.notify_one(); + } + + bool empty() + { + return _queue.empty(); + } + + private: + std::queue _queue; + tthread::mutex _mutex; + tthread::condition_variable _cond; +}; + +#endif