diff --git a/alprd.conf b/alprd.conf new file mode 100644 index 0000000..265fd8e --- /dev/null +++ b/alprd.conf @@ -0,0 +1,24 @@ +# alprd - OpenALPR daemon +# +# OpenALPR daemon detects license plate in the background + +description "OpenALPR daemon" +author "Matt Hill" + +start on filesystem and static-network-up +stop on runlevel [016] + +expect fork + +respawn +respawn limit 5 30 + +env uid=watchtower +env gid=watchtower + +env DAEMON=/usr/local/bin/alprd +env DAEMON_ARGS="-l /var/log/watchtower/alprd.log" +env PIDFILE=/var/run/alprd.pid + +exec start-stop-daemon --start --quiet --pidfile $PIDFILE --exec $DAEMON --chuid $uid:$gid -- $DAEMON_ARGS + diff --git a/config/alprd.conf b/config/alprd.conf new file mode 100644 index 0000000..df6d7dd --- /dev/null +++ b/config/alprd.conf @@ -0,0 +1,18 @@ + + +[daemon] +; Declare each stream on a separate line +; each unique stream should be defined as stream = [url] + +;stream = http://www.google.com/video.stream +;stream = http://stream2.com/stream + +site_id = site-name + +store_plates = 0 +store_plates_location = /var/www/html/plates/ + +; upload address is the destination to POST to +upload_data = 0 +upload_address = http://localhost:9000/alpr/push/ + diff --git a/plate_size_quota.py b/plate_size_quota.py new file mode 100644 index 0000000..72ebb1e --- /dev/null +++ b/plate_size_quota.py @@ -0,0 +1,75 @@ +import os +import shutil + +BYTES_IN_A_MEGABYTE = 1048576 + +size_quota_mb=200000 +size_quota_bytes = size_quota_mb * 1048576 +dir='/var/www/html/plates/' + + +def get_size(start_path = '.'): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(start_path): + for f in filenames: + fp = os.path.join(dirpath, f) + total_size += os.path.getsize(fp) + return total_size + + + + + +all_files = [] + +os.chdir(dir) + +initial_dir_size = get_size() +print initial_dir_size + +if (initial_dir_size <= size_quota_bytes): + dir_size_mb = float(initial_dir_size) / float(BYTES_IN_A_MEGABYTE) + print "Directory is within quota (" + str(dir_size_mb) + " / " + str(size_quota_mb) + " MB)" + exit() + +for files in os.listdir("."): + #print files + #print " -- " + str(os.stat(files)) + if os.path.isdir(files): + filetuple = ( os.stat(files).st_mtime, get_size(files), files) + all_files.append( filetuple ) + else: + filetuple = ( os.stat(files).st_mtime, os.stat(files).st_size, files) + all_files.append( filetuple ) + + +print "UNSORTED" + +for file in all_files: + print file[0] + +#print all_files + +all_files.sort(key=lambda tup: tup[0]) + +print "SORTED" + +for file in all_files: + print file[2] + +bytes_left_to_delete = initial_dir_size - size_quota_bytes + +for fileinfo in all_files: + if bytes_left_to_delete <= 0: + break + filename = fileinfo[2] + filebytes = fileinfo[1] + + print "Deleting: " + filename + " (" + str(filebytes) + " bytes)" + + if (os.path.isdir(filename)): + shutil.rmtree(filename, True) + else: + os.remove(filename) + bytes_left_to_delete = bytes_left_to_delete - filebytes + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f019eae..9c0e5ad 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,6 +62,7 @@ include_directories(./openalpr ) set(CMAKE_CSS_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall ") ADD_EXECUTABLE( alpr main.cpp videobuffer.cpp ) +ADD_EXECUTABLE( alprd daemon.cpp videobuffer.cpp daemon/beanstalk.c daemon/beanstalk.cc daemon/uuid.cpp ) TARGET_LINK_LIBRARIES(alpr @@ -70,6 +71,18 @@ TARGET_LINK_LIBRARIES(alpr ${OpenCV_LIBS} ${Tesseract_LIBS} ) + + +TARGET_LINK_LIBRARIES(alprd + openalpr + support + uuid + curl + log4cplus + ${OpenCV_LIBS} + ${Tesseract_LIBS} + ) + add_subdirectory(openalpr) add_subdirectory(misc_utilities) diff --git a/src/daemon.cpp b/src/daemon.cpp new file mode 100644 index 0000000..068dfe2 --- /dev/null +++ b/src/daemon.cpp @@ -0,0 +1,441 @@ + + +#include +#include + +#include "daemon/beanstalk.hpp" +#include "daemon/uuid.h" + +#include "tclap/CmdLine.h" +#include "alpr.h" +#include "openalpr/simpleini/simpleini.h" +#include "openalpr/cjson.h" +#include "support/tinythread.h" +#include "videobuffer.h" +#include +#include "support/timing.h" + +#include +#include +#include +#include +#include + +// prototypes +void streamRecognitionThread(void* arg); +bool writeToQueue(std::string jsonResult); +bool uploadPost(std::string url, std::string data); +void dataUploadThread(void* arg); + +// Constants +const std::string DEFAULT_LOG_FILE_PATH="/var/log/openalpr.log"; +const std::string DAEMON_CONFIG_FILE_PATH="/etc/openalpr/alprd.conf"; + +const std::string BEANSTALK_QUEUE_HOST="127.0.0.1"; +const int BEANSTALK_PORT=11300; +const std::string BEANSTALK_TUBE_NAME="alprd"; + +struct CaptureThreadData +{ + std::string stream_url; + std::string site_id; + int camera_id; + + bool clock_on; + + std::string config_file; + std::string country_code; + bool output_images; + std::string output_image_folder; + int top_n; +}; + +struct UploadThreadData +{ + std::string upload_url; +}; + +bool daemon_active; + +static log4cplus::Logger logger; + +int main( int argc, const char** argv ) +{ + daemon_active = true; + + bool noDaemon = false; + bool clockOn = false; + std::string logFile; + int topn; + + std::string configFile; + std::string country; + + TCLAP::CmdLine cmd("OpenAlpr Daemon", ' ', Alpr::getVersion()); + + TCLAP::ValueArg countryCodeArg("c","country","Country code to identify (either us for USA or eu for Europe). Default=us",false, "us" ,"country_code"); + TCLAP::ValueArg configFileArg("","config","Path to the openalpr.conf file.",false, "" ,"config_file"); + TCLAP::ValueArg topNArg("n","topn","Max number of possible plate numbers to return. Default=25",false, 25 ,"topN"); + TCLAP::ValueArg logFileArg("l","log","Log file to write to. Default=" + DEFAULT_LOG_FILE_PATH,false, DEFAULT_LOG_FILE_PATH ,"topN"); + + TCLAP::SwitchArg daemonOffSwitch("f","foreground","Set this flag for debugging. Disables forking the process as a daemon and runs in the foreground. Default=off", cmd, false); + TCLAP::SwitchArg clockSwitch("","clock","Display timing information to log. Default=off", cmd, false); + + try + { + + cmd.add( topNArg ); + cmd.add( configFileArg ); + cmd.add( logFileArg ); + + + if (cmd.parse( argc, argv ) == false) + { + // Error occured while parsing. Exit now. + return 1; + } + + country = countryCodeArg.getValue(); + configFile = configFileArg.getValue(); + logFile = logFileArg.getValue(); + topn = topNArg.getValue(); + noDaemon = daemonOffSwitch.getValue(); + clockOn = clockSwitch.getValue(); + } + catch (TCLAP::ArgException &e) // catch any exceptions + { + std::cerr << "error: " << e.error() << " for arg " << e.argId() << std::endl; + return 1; + } + + log4cplus::BasicConfigurator config; + config.configure(); + + if (noDaemon == false) + { + // Fork off into a separate daemon + daemon(0, 0); + + + log4cplus::SharedAppenderPtr myAppender(new log4cplus::RollingFileAppender(logFile)); + myAppender->setName("alprd_appender"); + // Redirect std out to log file + logger = log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("alprd")); + logger.addAppender(myAppender); + + + LOG4CPLUS_INFO(logger, "Running OpenALPR daemon in daemon mode."); + } + else + { + //log4cplus::SharedAppenderPtr myAppender(new log4cplus::ConsoleAppender()); + //myAppender->setName("alprd_appender"); + // Redirect std out to log file + logger = log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("alprd")); + //logger.addAppender(myAppender); + + LOG4CPLUS_INFO(logger, "Running OpenALPR daemon in the foreground."); + } + + CSimpleIniA ini; + ini.SetMultiKey(); + + ini.LoadFile(DAEMON_CONFIG_FILE_PATH.c_str()); + + std::vector stream_urls; + + + CSimpleIniA::TNamesDepend values; + ini.GetAllValues("daemon", "stream", values); + + // sort the values into the original load order + values.sort(CSimpleIniA::Entry::LoadOrder()); + + // output all of the items + CSimpleIniA::TNamesDepend::const_iterator i; + for (i = values.begin(); i != values.end(); ++i) { + stream_urls.push_back(i->pItem); + } + + + if (stream_urls.size() == 0) + { + LOG4CPLUS_FATAL(logger, "No video streams defined in the configuration."); + return 1; + } + + bool storePlates = ini.GetBoolValue("daemon", "store_plates", false); + std::string imageFolder = ini.GetValue("daemon", "store_plates_location", "/tmp/"); + bool uploadData = ini.GetBoolValue("daemon", "upload_data", false); + std::string upload_url = ini.GetValue("daemon", "upload_address", ""); + std::string site_id = ini.GetValue("daemon", "site_id", ""); + + LOG4CPLUS_INFO(logger, "Using: " << imageFolder << " for storing valid plate images"); + + pid_t pid; + + for (int i = 0; i < stream_urls.size(); i++) + { + pid = fork(); + if (pid == (pid_t) 0) + { + // This is the child process, kick off the capture data and upload threads + CaptureThreadData* tdata = new CaptureThreadData(); + tdata->stream_url = stream_urls[i]; + tdata->camera_id = i + 1; + tdata->config_file = configFile; + tdata->output_images = storePlates; + tdata->output_image_folder = imageFolder; + tdata->country_code = country; + tdata->site_id = site_id; + tdata->top_n = topn; + tdata->clock_on = clockOn; + + tthread::thread* thread_recognize = new tthread::thread(streamRecognitionThread, (void*) tdata); + + if (uploadData) + { + // Kick off the data upload thread + UploadThreadData* udata = new UploadThreadData(); + udata->upload_url = upload_url; + tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata ); + } + + break; + } + + // Parent process will continue and spawn more children + } + + + + while (daemon_active) + { + usleep(30000); + } + +} + + +void streamRecognitionThread(void* arg) +{ + CaptureThreadData* tdata = (CaptureThreadData*) arg; + + LOG4CPLUS_INFO(logger, "country: " << tdata->country_code << " -- config file: " << tdata->config_file ); + LOG4CPLUS_INFO(logger, "Stream " << tdata->camera_id << ": " << tdata->stream_url); + + Alpr alpr(tdata->country_code, tdata->config_file); + alpr.setTopN(tdata->top_n); + + + int framenum = 0; + + VideoBuffer videoBuffer; + + videoBuffer.connect(tdata->stream_url, 5); + + cv::Mat latestFrame; + + std::vector buffer; + + LOG4CPLUS_INFO(logger, "Starting camera " << tdata->camera_id); + + while (daemon_active) + { + int response = videoBuffer.getLatestFrame(&latestFrame); + + if (response != -1) + { + + timespec startTime; + getTime(&startTime); + cv::imencode(".bmp", latestFrame, buffer ); + std::vector results = alpr.recognize(buffer); + + timespec endTime; + getTime(&endTime); + double totalProcessingTime = diffclock(startTime, endTime); + + if (tdata->clock_on) + { + LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms."); + } + + if (results.size() > 0) + { + // Create a UUID for the image + std::string uuid = newUUID(); + + // Save the image to disk (using the UUID) + if (tdata->output_images) + { + std::stringstream ss; + ss << tdata->output_image_folder << "/" << uuid << ".jpg"; + + cv::imwrite(ss.str(), latestFrame); + } + + // Update the JSON content to include UUID and camera ID + + std::string json = alpr.toJson(results, totalProcessingTime); + + cJSON *root = cJSON_Parse(json.c_str()); + cJSON_AddStringToObject(root, "uuid", uuid.c_str()); + cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id); + cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str()); + cJSON_AddNumberToObject(root, "img_width", latestFrame.cols); + cJSON_AddNumberToObject(root, "img_height", latestFrame.rows); + + char *out; + out=cJSON_PrintUnformatted(root); + cJSON_Delete(root); + + std::string response(out); + + free(out); + + // Push the results to the Beanstalk queue + for (int j = 0; j < results.size(); j++) + { + LOG4CPLUS_DEBUG(logger, "Writing plate " << results[j].bestPlate.characters << " (" << uuid << ") to queue."); + } + + writeToQueue(response); + } + } + + usleep(10000); + } + + + videoBuffer.disconnect(); + + LOG4CPLUS_INFO(logger, "Video processing ended"); + + delete tdata; +} + + +bool writeToQueue(std::string jsonResult) +{ + try + { + Beanstalk::Client client(BEANSTALK_QUEUE_HOST, BEANSTALK_PORT); + client.use(BEANSTALK_TUBE_NAME); + + int id = client.put(jsonResult); + + if (id <= 0) + { + LOG4CPLUS_ERROR(logger, "Failed to write data to queue"); + return false; + } + + LOG4CPLUS_DEBUG(logger, "put job id: " << id ); + + } + catch (const std::runtime_error& error) + { + LOG4CPLUS_WARN(logger, "Error connecting to Beanstalk. Result has not been saved."); + return false; + } + return true; +} + + + +void dataUploadThread(void* arg) +{ + + /* In windows, this will init the winsock stuff */ + curl_global_init(CURL_GLOBAL_ALL); + + + UploadThreadData* udata = (UploadThreadData*) arg; + + + + + while(daemon_active) + { + try + { + Beanstalk::Client client(BEANSTALK_QUEUE_HOST, BEANSTALK_PORT); + + client.watch(BEANSTALK_TUBE_NAME); + + while (daemon_active) + { + Beanstalk::Job job; + + client.reserve(job); + + if (job.id() > 0) + { + //LOG4CPLUS_DEBUG(logger, job.body() ); + if (uploadPost(udata->upload_url, job.body())) + { + client.del(job.id()); + LOG4CPLUS_INFO(logger, "Job: " << job.id() << " successfully uploaded" ); + // Wait 10ms + usleep(10000); + } + else + { + client.release(job); + LOG4CPLUS_WARN(logger, "Job: " << job.id() << " failed to upload. Will retry." ); + // Wait 2 seconds + usleep(2000000); + } + } + + } + + } + catch (const std::runtime_error& error) + { + LOG4CPLUS_WARN(logger, "Error connecting to Beanstalk. Will retry." ); + } + // wait 5 seconds + usleep(5000000); + } + + curl_global_cleanup(); +} + + +bool uploadPost(std::string url, std::string data) +{ + bool success = true; + CURL *curl; + CURLcode res; + + + /* get a curl handle */ + curl = curl_easy_init(); + if(curl) { + /* First set the URL that is about to receive our POST. This URL can + just as well be a https:// URL if that is what should receive the + data. */ + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + /* Now specify the POST data */ + //char* escaped_data = curl_easy_escape(curl, data.c_str(), data.length()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data.c_str()); + //curl_free(escaped_data); + + /* Perform the request, res will get the return code */ + res = curl_easy_perform(curl); + /* Check for errors */ + if(res != CURLE_OK) + { + success = false; + } + + /* always cleanup */ + curl_easy_cleanup(curl); + } + + return success; + + +} \ No newline at end of file diff --git a/src/daemon/beanstalk.c b/src/daemon/beanstalk.c new file mode 100644 index 0000000..e1252d2 --- /dev/null +++ b/src/daemon/beanstalk.c @@ -0,0 +1,645 @@ +#include "beanstalk.h" +#include +#include +#include +#include +#include +#include +#include + +#define BS_STATUS_IS(message, code) strncmp(message, code, strlen(code)) == 0 + +#define BS_MESSAGE_NO_BODY 0 +#define BS_MESSAGE_HAS_BODY 1 + +#ifndef BS_READ_CHUNK_SIZE +#define BS_READ_CHUNK_SIZE 4096 +#endif + +#define DATA_PENDING (errno == EAGAIN || errno == EWOULDBLOCK) + +const char *bs_status_verbose[] = { + "Success", + "Operation failed", + "Expected CRLF", + "Job too big", + "Queue draining", + "Timed out", + "Not found", + "Deadline soon", + "Buried", + "Not ignored" +}; + +const char bs_resp_using[] = "USING"; +const char bs_resp_watching[] = "WATCHING"; +const char bs_resp_inserted[] = "INSERTED"; +const char bs_resp_buried[] = "BURIED"; +const char bs_resp_expected_crlf[] = "EXPECTED_CRLF"; +const char bs_resp_job_too_big[] = "JOB_TOO_BIG"; +const char bs_resp_draining[] = "DRAINING"; +const char bs_resp_reserved[] = "RESERVED"; +const char bs_resp_deadline_soon[] = "DEADLINE_SOON"; +const char bs_resp_timed_out[] = "TIMED_OUT"; +const char bs_resp_deleted[] = "DELETED"; +const char bs_resp_not_found[] = "NOT_FOUND"; +const char bs_resp_released[] = "RELEASED"; +const char bs_resp_touched[] = "TOUCHED"; +const char bs_resp_not_ignored[] = "NOT_IGNORED"; +const char bs_resp_found[] = "FOUND"; +const char bs_resp_kicked[] = "KICKED"; +const char bs_resp_ok[] = "OK"; + +const char* bs_status_text(int code) { + unsigned int cindex = (unsigned int) abs(code); + return (cindex > sizeof(bs_status_verbose) / sizeof(char*)) ? 0 : bs_status_verbose[cindex]; +} + +int bs_resolve_address(char *host, int port, struct sockaddr_in *server) { + char service[64]; + struct addrinfo *addr, *rec; + + snprintf(service, 64, "%d", port); + if (getaddrinfo(host, service, 0, &addr) != 0) + return BS_STATUS_FAIL; + + for (rec = addr; rec != 0; rec = rec->ai_next) { + if (rec->ai_family == AF_INET) { + memcpy(server, rec->ai_addr, sizeof(*server)); + break; + } + } + + freeaddrinfo(addr); + return BS_STATUS_OK; +} + +int bs_connect(char *host, int port) { + int fd, state = 1; + struct sockaddr_in server; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0 || bs_resolve_address(host, port, &server) < 0) + return BS_STATUS_FAIL; + + if (connect(fd, (struct sockaddr*)&server, sizeof(server)) != 0) { + close(fd); + return BS_STATUS_FAIL; + } + + /* disable nagle - we buffer in the application layer */ + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &state, sizeof(state)); + return fd; +} + +int bs_connect_with_timeout(char *host, int port, float secs) { + struct sockaddr_in server; + int fd, res, option, state = 1; + socklen_t option_length; + struct pollfd pfd; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0 || bs_resolve_address(host, port, &server) < 0) + return BS_STATUS_FAIL; + + // Set non-blocking + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, NULL) | O_NONBLOCK); + + res = connect(fd, (struct sockaddr*)&server, sizeof(server)); + if (res < 0) { + if (errno == EINPROGRESS) { + // Init poll structure + pfd.fd = fd; + pfd.events = POLLOUT; + + if (poll(&pfd, 1, (int)(secs*1000)) > 0) { + option_length = sizeof(int); + getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)(&option), &option_length); + if (option) { + close(fd); + return BS_STATUS_FAIL; + } + } else { + close(fd); + return BS_STATUS_FAIL; + } + } else { + close(fd); + return BS_STATUS_FAIL; + } + } + + // Set to blocking mode + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, NULL) & ~(O_NONBLOCK)); + + /* disable nagle - we buffer in the application layer */ + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &state, sizeof(state)); + return fd; +} + +int bs_disconnect(int fd) { + close(fd); + return BS_STATUS_OK; +} + +void bs_free_message(BSM* m) { + if (m->status) + free(m->status); + if (m->data) + free(m->data); + free(m); +} + +void bs_free_job(BSJ *job) { + if (job->data) + free(job->data); + free(job); +} + +// optional polling +bs_poll_function bs_poll = 0; + +void bs_start_polling(bs_poll_function f) { + bs_poll = f; +} + +void bs_reset_polling() { + bs_poll = 0; +} + +BSM* bs_recv_message(int fd, int expect_data) { + char *token, *data; + size_t bytes, data_size, status_size, status_max = 512, expect_data_bytes = 0; + ssize_t ret; + BSM *message = (BSM*)calloc(1, sizeof(BSM)); + if (!message) return 0; + + message->status = (char*)calloc(1, status_max); + if (!message->status) { + bs_free_message(message); + return 0; + } + + // poll until ready to read + if (bs_poll) bs_poll(1, fd); + ret = recv(fd, message->status, status_max - 1, 0); + if (ret < 0) { + bs_free_message(message); + return 0; + } else { + bytes = (size_t) ret; + } + + token = strstr(message->status, "\r\n"); + if (!token) { + bs_free_message(message); + return 0; + } + + *token = 0; + status_size = (size_t) (token - message->status); + + if (expect_data) { + token = rindex(message->status, ' '); + expect_data_bytes = token ? strtoul(token + 1, NULL, 10) : 0; + } + + if (!expect_data || expect_data_bytes == 0) + return message; + + message->size = bytes - status_size - 2; + data_size = message->size > BS_READ_CHUNK_SIZE ? message->size + BS_READ_CHUNK_SIZE : BS_READ_CHUNK_SIZE; + + message->data = (char*)malloc(data_size); + if (!message->data) { + bs_free_message(message); + return 0; + } + + memcpy(message->data, message->status + status_size + 2, message->size); + data = message->data + message->size; + + // already read the body along with status, all good. + if ((expect_data_bytes + 2) <= message->size) { + message->size = expect_data_bytes; + return message; + } + + while (1) { + // poll until ready to read. + if (bs_poll) bs_poll(1, fd); + ret = recv(fd, data, data_size - message->size, 0); + if (ret < 0) { + if (bs_poll && DATA_PENDING) + continue; + else { + bs_free_message(message); + return 0; + } + } else { + bytes = (size_t) ret; + } + + // doneski, we have read enough bytes + \r\n + if (message->size + bytes >= expect_data_bytes + 2) { + message->size = expect_data_bytes; + break; + } + + data_size += BS_READ_CHUNK_SIZE; + message->size += bytes; + message->data = (char*)realloc(message->data, data_size); + if (!message->data) { + bs_free_message(message); + return 0; + } + + // move ahead pointer for reading more. + data = message->data + message->size; + } + + return message; +} + +ssize_t bs_send_message(int fd, char *message, size_t size) { + // poll until ready to write. + if (bs_poll) bs_poll(2, fd); + return send(fd, message, size, bs_poll ? MSG_DONTWAIT : 0); +} + +typedef struct bs_message_packet { + char *data; + size_t offset; + size_t size; +} BSMP; + +BSMP* bs_message_packet_new(size_t bytes) { + BSMP *packet = (BSMP*)malloc(sizeof(BSMP)); + assert(packet); + + packet->data = (char*)malloc(bytes); + assert(packet->data); + + packet->offset = 0; + packet->size = bytes; + + return packet; +} + +void bs_message_packet_append(BSMP *packet, char *data, size_t bytes) { + if (packet->offset + bytes > packet->size) { + packet->data = (char*)realloc(packet->data, packet->size + bytes); + assert(packet->data); + packet->size += bytes; + } + + memcpy(packet->data + packet->offset, data, bytes); + packet->offset += bytes; +} + +void bs_message_packet_free(BSMP *packet) { + free(packet->data); + free(packet); +} + +#define BS_SEND(fd, command, size) { \ + if (bs_send_message(fd, command, size) < 0) \ + return BS_STATUS_FAIL; \ +} + +#define BS_CHECK_MESSAGE(message) { \ + if (!message) \ + return BS_STATUS_FAIL; \ +} + +#define BS_RETURN_OK_WHEN(message, okstatus) { \ + if (BS_STATUS_IS(message->status, okstatus)) { \ + bs_free_message(message); \ + return BS_STATUS_OK; \ + } \ +} + +#define BS_RETURN_FAIL_WHEN(message, nokstatus, nokcode) { \ + if (BS_STATUS_IS(message->status, nokstatus)) { \ + bs_free_message(message); \ + return nokcode; \ + } \ +} + +#define BS_RETURN_INVALID(message) { \ + bs_free_message(message); \ + return BS_STATUS_FAIL; \ +} + +int bs_use(int fd, char *tube) { + BSM *message; + char command[1024]; + + snprintf(command, 1024, "use %s\r\n", tube); + BS_SEND(fd, command, strlen(command)); + + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_using); + BS_RETURN_INVALID(message); +} + +int bs_watch(int fd, char *tube) { + BSM *message; + char command[1024]; + + snprintf(command, 1024, "watch %s\r\n", tube); + BS_SEND(fd, command, strlen(command)); + + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_watching); + BS_RETURN_INVALID(message); +} + +int bs_ignore(int fd, char *tube) { + BSM *message; + char command[1024]; + + snprintf(command, 1024, "ignore %s\r\n", tube); + BS_SEND(fd, command, strlen(command)); + + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_watching); + BS_RETURN_INVALID(message); +} + +int64_t bs_put(int fd, uint32_t priority, uint32_t delay, uint32_t ttr, char *data, size_t bytes) { + int64_t id; + BSMP *packet; + BSM *message; + char command[1024]; + size_t command_bytes; + + snprintf(command, 1024, "put %"PRIu32" %"PRIu32" %"PRIu32" %lu\r\n", priority, delay, ttr, bytes); + + command_bytes = strlen(command); + packet = bs_message_packet_new(command_bytes + bytes + 3); + bs_message_packet_append(packet, command, command_bytes); + bs_message_packet_append(packet, data, bytes); + bs_message_packet_append(packet, "\r\n", 2); + + // Can't use BS_SEND here, allocated memory needs to + // be cleared on error + int ret_code = bs_send_message(fd, packet->data, packet->offset); + bs_message_packet_free(packet); + if (ret_code <0) { + return BS_STATUS_FAIL; + } + + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + + if (BS_STATUS_IS(message->status, bs_resp_inserted)) { + id = strtoll(message->status + strlen(bs_resp_inserted) + 1, NULL, 10); + bs_free_message(message); + return id; + } + + if (BS_STATUS_IS(message->status, bs_resp_buried)) { + id = strtoll(message->status + strlen(bs_resp_buried) + 1, NULL, 10); + bs_free_message(message); + return id; + } + + BS_RETURN_FAIL_WHEN(message, bs_resp_expected_crlf, BS_STATUS_EXPECTED_CRLF); + BS_RETURN_FAIL_WHEN(message, bs_resp_job_too_big, BS_STATUS_JOB_TOO_BIG); + BS_RETURN_FAIL_WHEN(message, bs_resp_draining, BS_STATUS_DRAINING); + BS_RETURN_INVALID(message); +} + +int bs_delete(int fd, int64_t job) { + BSM *message; + char command[512]; + + snprintf(command, 512, "delete %"PRId64"\r\n", job); + BS_SEND(fd, command, strlen(command)); + + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_deleted); + BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND); + BS_RETURN_INVALID(message); +} + +int bs_reserve_job(int fd, char *command, BSJ **result) { + BSJ *job; + BSM *message; + +// XXX: debug +// struct timeval start, end; +// gettimeofday(&start, 0); + + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_HAS_BODY); + BS_CHECK_MESSAGE(message); + + if (BS_STATUS_IS(message->status, bs_resp_reserved)) { + *result = job = (BSJ*)malloc(sizeof(BSJ)); + if (!job) { + bs_free_message(message); + return BS_STATUS_FAIL; + } + + sscanf(message->status + strlen(bs_resp_reserved) + 1, "%"PRId64" %lu", &job->id, &job->size); + job->data = message->data; + message->data = 0; + bs_free_message(message); + + // XXX: debug + // gettimeofday(&end, 0); + // printf("elapsed: %lu\n", (end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec)); + return BS_STATUS_OK; + } + + // i don't think we'll ever hit this status code here. + BS_RETURN_FAIL_WHEN(message, bs_resp_timed_out, BS_STATUS_TIMED_OUT); + BS_RETURN_FAIL_WHEN(message, bs_resp_deadline_soon, BS_STATUS_DEADLINE_SOON); + BS_RETURN_INVALID(message); +} + +int bs_reserve(int fd, BSJ **result) { + char *command = "reserve\r\n"; + return bs_reserve_job(fd, command, result); +} + +int bs_reserve_with_timeout(int fd, uint32_t ttl, BSJ **result) { + char command[512]; + snprintf(command, 512, "reserve-with-timeout %"PRIu32"\r\n", ttl); + return bs_reserve_job(fd, command, result); +} + +int bs_release(int fd, int64_t id, uint32_t priority, uint32_t delay) { + BSM *message; + char command[512]; + + snprintf(command, 512, "release %"PRId64" %"PRIu32" %"PRIu32"\r\n", id, priority, delay); + BS_SEND(fd, command, strlen(command)); + + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_released); + BS_RETURN_FAIL_WHEN(message, bs_resp_buried, BS_STATUS_BURIED); + BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND); + BS_RETURN_INVALID(message); +} + +int bs_bury(int fd, int64_t id, uint32_t priority) { + BSM *message; + char command[512]; + + snprintf(command, 512, "bury %"PRId64" %"PRIu32"\r\n", id, priority); + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_buried); + BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND); + BS_RETURN_INVALID(message); +} + +int bs_touch(int fd, int64_t id) { + BSM *message; + char command[512]; + + snprintf(command, 512, "touch %"PRId64"\r\n", id); + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_touched); + BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND); + BS_RETURN_INVALID(message); +} + +int bs_peek_job(int fd, char *command, BSJ **result) { + BSJ *job; + BSM *message; + + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_HAS_BODY); + BS_CHECK_MESSAGE(message); + + if (BS_STATUS_IS(message->status, bs_resp_found)) { + *result = job = (BSJ*)malloc(sizeof(BSJ)); + if (!job) { + bs_free_message(message); + return BS_STATUS_FAIL; + } + + sscanf(message->status + strlen(bs_resp_found) + 1, "%"PRId64" %lu", &job->id, &job->size); + job->data = message->data; + message->data = 0; + bs_free_message(message); + + return BS_STATUS_OK; + } + + BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND); + BS_RETURN_INVALID(message); +} + +int bs_peek(int fd, int64_t id, BSJ **job) { + char command[512]; + snprintf(command, 512, "peek %"PRId64"\r\n", id); + return bs_peek_job(fd, command, job); +} + +int bs_peek_ready(int fd, BSJ **job) { + return bs_peek_job(fd, "peek-ready\r\n", job); +} + +int bs_peek_delayed(int fd, BSJ **job) { + return bs_peek_job(fd, "peek-delayed\r\n", job); +} + +int bs_peek_buried(int fd, BSJ **job) { + return bs_peek_job(fd, "peek-buried\r\n", job); +} + +int bs_kick(int fd, int bound) { + BSM *message; + char command[512]; + + snprintf(command, 512, "kick %d\r\n", bound); + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + BS_CHECK_MESSAGE(message); + BS_RETURN_OK_WHEN(message, bs_resp_kicked); + BS_RETURN_INVALID(message); +} + +int bs_list_tube_used(int fd, char **tube) { + BSM *message; + char command[64]; + snprintf(command, 64, "list-tube-used\r\n"); + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_NO_BODY); + if (BS_STATUS_IS(message->status, bs_resp_using)) { + *tube = (char*)calloc(1, strlen(message->status) - strlen(bs_resp_using) + 1); + strcpy(*tube, message->status + strlen(bs_resp_using) + 1); + bs_free_message(message); + return BS_STATUS_OK; + } + BS_RETURN_INVALID(message); +} + +int bs_get_info(int fd, char *command, char **yaml) { + BSM *message; + size_t size; + + BS_SEND(fd, command, strlen(command)); + message = bs_recv_message(fd, BS_MESSAGE_HAS_BODY); + BS_CHECK_MESSAGE(message); + if (BS_STATUS_IS(message->status, bs_resp_ok)) { + sscanf(message->status + strlen(bs_resp_ok) + 1, "%lu", &size); + *yaml = message->data; + (*yaml)[size] = 0; + message->data = 0; + bs_free_message(message); + return BS_STATUS_OK; + } + + BS_RETURN_INVALID(message); +} + +int bs_list_tubes(int fd, char **yaml) { + char command[64]; + snprintf(command, 64, "list-tubes\r\n"); + return bs_get_info(fd, command, yaml); +} + +int bs_list_tubes_watched(int fd, char **yaml) { + char command[64]; + snprintf(command, 64, "list-tubes-watched\r\n"); + return bs_get_info(fd, command, yaml); +} + +int bs_stats(int fd, char **yaml) { + char command[64]; + snprintf(command, 64, "stats\r\n"); + return bs_get_info(fd, command, yaml); +} + +int bs_stats_job(int fd, int64_t id, char **yaml) { + char command[128]; + snprintf(command, 128, "stats-job %"PRId64"\r\n", id); + return bs_get_info(fd, command, yaml); +} + +int bs_stats_tube(int fd, char *tube, char **yaml) { + char command[512]; + snprintf(command, 512, "stats-tube %s\r\n", tube); + return bs_get_info(fd, command, yaml); +} + +void bs_version(int *major, int *minor, int *patch) +{ + *major = BS_MAJOR_VERSION; + *minor = BS_MINOR_VERSION; + *patch = BS_PATCH_VERSION; +} diff --git a/src/daemon/beanstalk.cc b/src/daemon/beanstalk.cc new file mode 100644 index 0000000..cc8ee32 --- /dev/null +++ b/src/daemon/beanstalk.cc @@ -0,0 +1,317 @@ +#include "beanstalk.hpp" +#include +#include +#include + +using namespace std; + +namespace Beanstalk { + + Job::Job() { + _id = 0; + } + + Job::Job(int64_t id, char *data, size_t size) { + _body.assign(data, size); + _id = id; + } + + Job::Job(BSJ *job) { + if (job) { + _body.assign(job->data, job->size); + _id = job->id; + bs_free_job(job); + } + else { + _id = 0; + } + } + + string& Job::body() { + return _body; + } + + int64_t Job::id() { + return _id; + } + + /* start helpers */ + + void parsedict(stringstream &stream, info_hash_t &dict) { + string key, value; + while(true) { + stream >> key; + if (stream.eof()) break; + if (key[0] == '-') continue; + stream >> value; + key.erase(--key.end()); + dict[key] = value; + } + } + + void parselist(stringstream &stream, info_list_t &list) { + string value; + while(true) { + stream >> value; + if (stream.eof()) break; + if (value[0] == '-') continue; + list.push_back(value); + } + } + + /* end helpers */ + + Client::~Client() { + if (handle > 0) + bs_disconnect(handle); + handle = -1; + } + + Client::Client() { + handle = -1; + host = ""; + port = 0; + timeout_secs = 0; + } + + Client::Client(string host, int port, float secs) { + handle = -1; + timeout_secs = secs; + connect(host, port, timeout_secs); + } + + void Client::connect(string _host, int _port, float secs) { + if (handle > 0) + throw runtime_error("already connected to beanstalkd at " + host); + + host = _host; + port = _port; + timeout_secs = secs; + + handle = secs > 0 ? bs_connect_with_timeout((char *)host.c_str(), port, secs) : bs_connect((char*)host.c_str(), port); + if (handle < 0) + throw runtime_error("unable to connect to beanstalkd at " + host); + } + + bool Client::is_connected() { + return handle > 0; + } + + bool Client::disconnect() { + if (handle > 0 && bs_disconnect(handle) == BS_STATUS_OK) { + handle = -1; + return true; + } + return false; + } + + void Client::version(int *major, int *minor, int *patch) { + bs_version(major, minor, patch); + } + + void Client::reconnect() { + disconnect(); + connect(host, port, timeout_secs); + } + + bool Client::use(string tube) { + return bs_use(handle, (char*)tube.c_str()) == BS_STATUS_OK; + } + + bool Client::watch(string tube) { + return bs_watch(handle, (char*)tube.c_str()) == BS_STATUS_OK; + } + + bool Client::ignore(string tube) { + return bs_ignore(handle, (char*)tube.c_str()) == BS_STATUS_OK; + } + + int64_t Client::put(string body, uint32_t priority, uint32_t delay, uint32_t ttr) { + int64_t id = bs_put(handle, priority, delay, ttr, (char*)body.data(), body.size()); + return (id > 0 ? id : 0); + } + + int64_t Client::put(char *body, size_t bytes, uint32_t priority, uint32_t delay, uint32_t ttr) { + int64_t id = bs_put(handle, priority, delay, ttr, body, bytes); + return (id > 0 ? id : 0); + } + + bool Client::del(Job &job) { + return bs_delete(handle, job.id()) == BS_STATUS_OK; + } + + bool Client::del(int64_t id) { + return bs_delete(handle, id) == BS_STATUS_OK; + } + + bool Client::reserve(Job &job) { + BSJ *bsj; + if (bs_reserve(handle, &bsj) == BS_STATUS_OK) { + job = bsj; + return true; + } + return false; + } + + bool Client::reserve(Job &job, uint32_t timeout) { + BSJ *bsj; + if (bs_reserve_with_timeout(handle, timeout, &bsj) == BS_STATUS_OK) { + job = bsj; + return true; + } + return false; + } + + bool Client::release(Job &job, uint32_t priority, uint32_t delay) { + return bs_release(handle, job.id(), priority, delay) == BS_STATUS_OK; + } + + bool Client::release(int64_t id, uint32_t priority, uint32_t delay) { + return bs_release(handle, id, priority, delay) == BS_STATUS_OK; + } + + bool Client::bury(Job &job, uint32_t priority) { + return bs_bury(handle, job.id(), priority) == BS_STATUS_OK; + } + + bool Client::bury(int64_t id, uint32_t priority) { + return bs_bury(handle, id, priority) == BS_STATUS_OK; + } + + bool Client::touch(Job &job) { + return bs_touch(handle, job.id()) == BS_STATUS_OK; + } + + bool Client::touch(int64_t id) { + return bs_touch(handle, id) == BS_STATUS_OK; + } + + bool Client::peek(Job &job, int64_t id) { + BSJ *bsj; + if (bs_peek(handle, id, &bsj) == BS_STATUS_OK) { + job = bsj; + return true; + } + return false; + } + + bool Client::peek_ready(Job &job) { + BSJ *bsj; + if (bs_peek_ready(handle, &bsj) == BS_STATUS_OK) { + job = bsj; + return true; + } + return false; + } + + bool Client::peek_delayed(Job &job) { + BSJ *bsj; + if (bs_peek_delayed(handle, &bsj) == BS_STATUS_OK) { + job = bsj; + return true; + } + return false; + } + + bool Client::peek_buried(Job &job) { + BSJ *bsj; + if (bs_peek_buried(handle, &bsj) == BS_STATUS_OK) { + job = bsj; + return true; + } + return false; + } + + bool Client::kick(int bound) { + return bs_kick(handle, bound) == BS_STATUS_OK; + } + + string Client::list_tube_used() { + char *name; + string tube; + if (bs_list_tube_used(handle, &name) == BS_STATUS_OK) { + tube.assign(name); + free(name); + } + + return tube; + } + + info_list_t Client::list_tubes() { + char *yaml, *data; + info_list_t tubes; + if (bs_list_tubes(handle, &yaml) == BS_STATUS_OK) { + if ((data = strstr(yaml, "---"))) { + stringstream stream(data); + parselist(stream, tubes); + } + free(yaml); + } + return tubes; + } + + info_list_t Client::list_tubes_watched() { + char *yaml, *data; + info_list_t tubes; + if (bs_list_tubes_watched(handle, &yaml) == BS_STATUS_OK) { + if ((data = strstr(yaml, "---"))) { + stringstream stream(data); + parselist(stream, tubes); + } + free(yaml); + } + return tubes; + } + + info_hash_t Client::stats() { + char *yaml, *data; + info_hash_t stats; + string key, value; + if (bs_stats(handle, &yaml) == BS_STATUS_OK) { + if ((data = strstr(yaml, "---"))) { + stringstream stream(data); + parsedict(stream, stats); + } + free(yaml); + } + return stats; + } + + info_hash_t Client::stats_job(int64_t id) { + char *yaml, *data; + info_hash_t stats; + string key, value; + if (bs_stats_job(handle, id, &yaml) == BS_STATUS_OK) { + if ((data = strstr(yaml, "---"))) { + stringstream stream(data); + parsedict(stream, stats); + } + free(yaml); + } + return stats; + } + + info_hash_t Client::stats_tube(string name) { + char *yaml, *data; + info_hash_t stats; + string key, value; + if (bs_stats_tube(handle, (char*)name.c_str(), &yaml) == BS_STATUS_OK) { + if ((data = strstr(yaml, "---"))) { + stringstream stream(data); + parsedict(stream, stats); + } + free(yaml); + } + return stats; + } + + bool Client::ping() { + char *yaml; + if (bs_list_tubes(handle, &yaml) == BS_STATUS_OK) { + free(yaml); + return true; + } + + return false; + } +} diff --git a/src/daemon/beanstalk.h b/src/daemon/beanstalk.h new file mode 100644 index 0000000..be245a1 --- /dev/null +++ b/src/daemon/beanstalk.h @@ -0,0 +1,114 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define BS_MAJOR_VERSION 1 +#define BS_MINOR_VERSION 2 +#define BS_PATCH_VERSION 0 + +#define BS_STATUS_OK 0 +#define BS_STATUS_FAIL -1 +#define BS_STATUS_EXPECTED_CRLF -2 +#define BS_STATUS_JOB_TOO_BIG -3 +#define BS_STATUS_DRAINING -4 +#define BS_STATUS_TIMED_OUT -5 +#define BS_STATUS_NOT_FOUND -6 +#define BS_STATUS_DEADLINE_SOON -7 +#define BS_STATUS_BURIED -8 +#define BS_STATUS_NOT_IGNORED -9 + +#ifdef __cplusplus + extern "C" { +#endif + +typedef struct bs_message { + char *data; + char *status; + size_t size; +} BSM; + +typedef struct bs_job { + int64_t id; + char *data; + size_t size; +} BSJ; + + +// optional polling call, returns 1 if the socket is ready of the rw operation specified. +// rw: 1 => read, 2 => write, 3 => read/write +// fd: file descriptor of the socket +typedef int (*bs_poll_function)(int rw, int fd); + +/* Handle DSO symbol visibility - Stolen from zmq.h */ +#if defined _WIN32 +# if defined DLL_EXPORT +# define BSC_EXPORT __declspec(dllexport) +# else +# define BSC_EXPORT __declspec(dllimport) +# endif +#else +# if defined __SUNPRO_C || defined __SUNPRO_CC +# define BSC_EXPORT __global +# elif (defined __GNUC__ && __GNUC__ >= 4) || defined __INTEL_COMPILER +# define BSC_EXPORT __attribute__ ((visibility("default"))) +# else +# define BSC_EXPORT +# endif +#endif + +// export version +BSC_EXPORT void bs_version(int *major, int *minor, int *patch); + +// polling setup +BSC_EXPORT void bs_start_polling(bs_poll_function f); +BSC_EXPORT void bs_reset_polling(void); + +// returns a descriptive text of the error code. +BSC_EXPORT const char* bs_status_text(int code); + +BSC_EXPORT void bs_free_message(BSM* m); +BSC_EXPORT void bs_free_job(BSJ *job); + +// returns socket descriptor or BS_STATUS_FAIL +BSC_EXPORT int bs_connect(char *host, int port); +BSC_EXPORT int bs_connect_with_timeout(char *host, int port, float secs); + +// returns job id or one of the negative failure codes. +BSC_EXPORT int64_t bs_put(int fd, uint32_t priority, uint32_t delay, uint32_t ttr, char *data, size_t bytes); + +// rest return BS_STATUS_OK or one of the failure codes. +BSC_EXPORT int bs_disconnect(int fd); +BSC_EXPORT int bs_use(int fd, char *tube); +BSC_EXPORT int bs_watch(int fd, char *tube); +BSC_EXPORT int bs_ignore(int fd, char *tube); +BSC_EXPORT int bs_delete(int fd, int64_t job); +BSC_EXPORT int bs_reserve(int fd, BSJ **job); +BSC_EXPORT int bs_reserve_with_timeout(int fd, uint32_t ttl, BSJ **job); +BSC_EXPORT int bs_release(int fd, int64_t id, uint32_t priority, uint32_t delay); +BSC_EXPORT int bs_bury(int fd, int64_t id, uint32_t priority); +BSC_EXPORT int bs_touch(int fd, int64_t id); +BSC_EXPORT int bs_peek(int fd, int64_t id, BSJ **job); +BSC_EXPORT int bs_peek_ready(int fd, BSJ **job); +BSC_EXPORT int bs_peek_delayed(int fd, BSJ **job); +BSC_EXPORT int bs_peek_buried(int fd, BSJ **job); +BSC_EXPORT int bs_kick(int fd, int bound); +BSC_EXPORT int bs_list_tube_used(int fd, char **tube); +BSC_EXPORT int bs_list_tubes(int fd, char **yaml); +BSC_EXPORT int bs_list_tubes_watched(int fd, char **yaml); +BSC_EXPORT int bs_stats(int fd, char **yaml); +BSC_EXPORT int bs_stats_job(int fd, int64_t id, char **yaml); +BSC_EXPORT int bs_stats_tube(int fd, char *tube, char **yaml); + +#ifdef __cplusplus + } +#endif diff --git a/src/daemon/beanstalk.hpp b/src/daemon/beanstalk.hpp new file mode 100644 index 0000000..5c7bbc3 --- /dev/null +++ b/src/daemon/beanstalk.hpp @@ -0,0 +1,67 @@ +#pragma once + +#include "beanstalk.h" +#include +#include +#include + +namespace Beanstalk { + typedef std::vector info_list_t; + typedef std::map info_hash_t; + + class Job { + public: + int64_t id(); + std::string& body(); + Job(int64_t, char*, size_t); + Job(BSJ*); + Job(); + operator bool() { return _id > 0; } + protected: + int64_t _id; + std::string _body; + }; + + class Client { + public: + ~Client(); + Client(); + Client(std::string host, int port, float timeout_secs = 0); + bool ping(); + bool use(std::string); + bool watch(std::string); + bool ignore(std::string); + int64_t put(std::string, uint32_t priority = 0, uint32_t delay = 0, uint32_t ttr = 60); + int64_t put(char *data, size_t bytes, uint32_t priority, uint32_t delay, uint32_t ttr); + bool del(int64_t id); + bool del(Job&); + bool reserve(Job &); + bool reserve(Job &, uint32_t timeout); + bool release(Job &, uint32_t priority = 1, uint32_t delay = 0); + bool release(int64_t id, uint32_t priority = 1, uint32_t delay = 0); + bool bury(Job &, uint32_t priority = 1); + bool bury(int64_t id, uint32_t priority = 1); + bool touch(Job &); + bool touch(int64_t id); + bool peek(Job &, int64_t id); + bool peek_ready(Job &); + bool peek_delayed(Job &); + bool peek_buried(Job &); + bool kick(int bound); + void connect(std::string host, int port, float timeout_secs = 0); + void reconnect(); + bool disconnect(); + void version(int *major, int *minor, int *patch); + bool is_connected(); + std::string list_tube_used(); + info_list_t list_tubes(); + info_list_t list_tubes_watched(); + info_hash_t stats(); + info_hash_t stats_job(int64_t); + info_hash_t stats_tube(std::string); + protected: + float timeout_secs; + int handle, port; + std::string host; + }; +} diff --git a/src/daemon/uuid.cpp b/src/daemon/uuid.cpp new file mode 100644 index 0000000..f389a2b --- /dev/null +++ b/src/daemon/uuid.cpp @@ -0,0 +1,31 @@ +#include "uuid.h" + +extern "C" +{ +#ifdef WIN32 +#include +#else +#include +#endif +} + +std::string newUUID() +{ +#ifdef WIN32 + UUID uuid; + UuidCreate ( &uuid ); + + unsigned char * str; + UuidToStringA ( &uuid, &str ); + + std::string s( ( char* ) str ); + + RpcStringFreeA ( &str ); +#else + uuid_t uuid; + uuid_generate_random ( uuid ); + char s[37]; + uuid_unparse ( uuid, s ); +#endif + return s; +} \ No newline at end of file diff --git a/src/daemon/uuid.h b/src/daemon/uuid.h new file mode 100644 index 0000000..952e887 --- /dev/null +++ b/src/daemon/uuid.h @@ -0,0 +1,11 @@ +#ifndef OPENALPR_UUID_H +#define OPENALPR_UUID_H + +#include +#include + + +std::string newUUID(); + + +#endif // OPENALPR_UUID_H \ No newline at end of file diff --git a/src/plate_push.py b/src/plate_push.py new file mode 100644 index 0000000..a7e0b7f --- /dev/null +++ b/src/plate_push.py @@ -0,0 +1,13 @@ +#!/usr/bin/python + +import beanstalkc + +beanstalk = beanstalkc.Connection(host='localhost', port=11300) + +beanstalk.watch('alpr') + +job = beanstalk.reserve() + +print job.body + +job.delete()