Started daemon work

This commit is contained in:
Matt Hill
2014-06-02 23:35:16 -04:00
parent caaf44120c
commit 801d68be02
6 changed files with 1280 additions and 0 deletions

View File

@@ -62,6 +62,7 @@ include_directories(./openalpr )
set(CMAKE_CSS_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall ")
ADD_EXECUTABLE( alpr main.cpp videobuffer.cpp )
ADD_EXECUTABLE( alprd daemon.cpp videobuffer.cpp beanstalk.c beanstalk.cc )
TARGET_LINK_LIBRARIES(alpr
@@ -71,6 +72,15 @@ TARGET_LINK_LIBRARIES(alpr
${Tesseract_LIBS}
)
TARGET_LINK_LIBRARIES(alprd
openalpr
support
${OpenCV_LIBS}
${Tesseract_LIBS}
)
add_subdirectory(openalpr)
add_subdirectory(misc_utilities)

645
src/beanstalk.c Normal file
View File

@@ -0,0 +1,645 @@
#include "beanstalk.h"
#include <sys/time.h>
#include <errno.h>
#include <assert.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <inttypes.h>
#include <poll.h>
#define BS_STATUS_IS(message, code) strncmp(message, code, strlen(code)) == 0
#define BS_MESSAGE_NO_BODY 0
#define BS_MESSAGE_HAS_BODY 1
#ifndef BS_READ_CHUNK_SIZE
#define BS_READ_CHUNK_SIZE 4096
#endif
#define DATA_PENDING (errno == EAGAIN || errno == EWOULDBLOCK)
const char *bs_status_verbose[] = {
"Success",
"Operation failed",
"Expected CRLF",
"Job too big",
"Queue draining",
"Timed out",
"Not found",
"Deadline soon",
"Buried",
"Not ignored"
};
const char bs_resp_using[] = "USING";
const char bs_resp_watching[] = "WATCHING";
const char bs_resp_inserted[] = "INSERTED";
const char bs_resp_buried[] = "BURIED";
const char bs_resp_expected_crlf[] = "EXPECTED_CRLF";
const char bs_resp_job_too_big[] = "JOB_TOO_BIG";
const char bs_resp_draining[] = "DRAINING";
const char bs_resp_reserved[] = "RESERVED";
const char bs_resp_deadline_soon[] = "DEADLINE_SOON";
const char bs_resp_timed_out[] = "TIMED_OUT";
const char bs_resp_deleted[] = "DELETED";
const char bs_resp_not_found[] = "NOT_FOUND";
const char bs_resp_released[] = "RELEASED";
const char bs_resp_touched[] = "TOUCHED";
const char bs_resp_not_ignored[] = "NOT_IGNORED";
const char bs_resp_found[] = "FOUND";
const char bs_resp_kicked[] = "KICKED";
const char bs_resp_ok[] = "OK";
const char* bs_status_text(int code) {
unsigned int cindex = (unsigned int) abs(code);
return (cindex > sizeof(bs_status_verbose) / sizeof(char*)) ? 0 : bs_status_verbose[cindex];
}
int bs_resolve_address(char *host, int port, struct sockaddr_in *server) {
char service[64];
struct addrinfo *addr, *rec;
snprintf(service, 64, "%d", port);
if (getaddrinfo(host, service, 0, &addr) != 0)
return BS_STATUS_FAIL;
for (rec = addr; rec != 0; rec = rec->ai_next) {
if (rec->ai_family == AF_INET) {
memcpy(server, rec->ai_addr, sizeof(*server));
break;
}
}
freeaddrinfo(addr);
return BS_STATUS_OK;
}
int bs_connect(char *host, int port) {
int fd, state = 1;
struct sockaddr_in server;
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0 || bs_resolve_address(host, port, &server) < 0)
return BS_STATUS_FAIL;
if (connect(fd, (struct sockaddr*)&server, sizeof(server)) != 0) {
close(fd);
return BS_STATUS_FAIL;
}
/* disable nagle - we buffer in the application layer */
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &state, sizeof(state));
return fd;
}
int bs_connect_with_timeout(char *host, int port, float secs) {
struct sockaddr_in server;
int fd, res, option, state = 1;
socklen_t option_length;
struct pollfd pfd;
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0 || bs_resolve_address(host, port, &server) < 0)
return BS_STATUS_FAIL;
// Set non-blocking
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, NULL) | O_NONBLOCK);
res = connect(fd, (struct sockaddr*)&server, sizeof(server));
if (res < 0) {
if (errno == EINPROGRESS) {
// Init poll structure
pfd.fd = fd;
pfd.events = POLLOUT;
if (poll(&pfd, 1, (int)(secs*1000)) > 0) {
option_length = sizeof(int);
getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)(&option), &option_length);
if (option) {
close(fd);
return BS_STATUS_FAIL;
}
} else {
close(fd);
return BS_STATUS_FAIL;
}
} else {
close(fd);
return BS_STATUS_FAIL;
}
}
// Set to blocking mode
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, NULL) & ~(O_NONBLOCK));
/* disable nagle - we buffer in the application layer */
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &state, sizeof(state));
return fd;
}
int bs_disconnect(int fd) {
close(fd);
return BS_STATUS_OK;
}
void bs_free_message(BSM* m) {
if (m->status)
free(m->status);
if (m->data)
free(m->data);
free(m);
}
void bs_free_job(BSJ *job) {
if (job->data)
free(job->data);
free(job);
}
// optional polling
bs_poll_function bs_poll = 0;
void bs_start_polling(bs_poll_function f) {
bs_poll = f;
}
void bs_reset_polling() {
bs_poll = 0;
}
BSM* bs_recv_message(int fd, int expect_data) {
char *token, *data;
size_t bytes, data_size, status_size, status_max = 512, expect_data_bytes = 0;
ssize_t ret;
BSM *message = (BSM*)calloc(1, sizeof(BSM));
if (!message) return 0;
message->status = (char*)calloc(1, status_max);
if (!message->status) {
bs_free_message(message);
return 0;
}
// poll until ready to read
if (bs_poll) bs_poll(1, fd);
ret = recv(fd, message->status, status_max - 1, 0);
if (ret < 0) {
bs_free_message(message);
return 0;
} else {
bytes = (size_t) ret;
}
token = strstr(message->status, "\r\n");
if (!token) {
bs_free_message(message);
return 0;
}
*token = 0;
status_size = (size_t) (token - message->status);
if (expect_data) {
token = rindex(message->status, ' ');
expect_data_bytes = token ? strtoul(token + 1, NULL, 10) : 0;
}
if (!expect_data || expect_data_bytes == 0)
return message;
message->size = bytes - status_size - 2;
data_size = message->size > BS_READ_CHUNK_SIZE ? message->size + BS_READ_CHUNK_SIZE : BS_READ_CHUNK_SIZE;
message->data = (char*)malloc(data_size);
if (!message->data) {
bs_free_message(message);
return 0;
}
memcpy(message->data, message->status + status_size + 2, message->size);
data = message->data + message->size;
// already read the body along with status, all good.
if ((expect_data_bytes + 2) <= message->size) {
message->size = expect_data_bytes;
return message;
}
while (1) {
// poll until ready to read.
if (bs_poll) bs_poll(1, fd);
ret = recv(fd, data, data_size - message->size, 0);
if (ret < 0) {
if (bs_poll && DATA_PENDING)
continue;
else {
bs_free_message(message);
return 0;
}
} else {
bytes = (size_t) ret;
}
// doneski, we have read enough bytes + \r\n
if (message->size + bytes >= expect_data_bytes + 2) {
message->size = expect_data_bytes;
break;
}
data_size += BS_READ_CHUNK_SIZE;
message->size += bytes;
message->data = (char*)realloc(message->data, data_size);
if (!message->data) {
bs_free_message(message);
return 0;
}
// move ahead pointer for reading more.
data = message->data + message->size;
}
return message;
}
ssize_t bs_send_message(int fd, char *message, size_t size) {
// poll until ready to write.
if (bs_poll) bs_poll(2, fd);
return send(fd, message, size, bs_poll ? MSG_DONTWAIT : 0);
}
typedef struct bs_message_packet {
char *data;
size_t offset;
size_t size;
} BSMP;
BSMP* bs_message_packet_new(size_t bytes) {
BSMP *packet = (BSMP*)malloc(sizeof(BSMP));
assert(packet);
packet->data = (char*)malloc(bytes);
assert(packet->data);
packet->offset = 0;
packet->size = bytes;
return packet;
}
void bs_message_packet_append(BSMP *packet, char *data, size_t bytes) {
if (packet->offset + bytes > packet->size) {
packet->data = (char*)realloc(packet->data, packet->size + bytes);
assert(packet->data);
packet->size += bytes;
}
memcpy(packet->data + packet->offset, data, bytes);
packet->offset += bytes;
}
void bs_message_packet_free(BSMP *packet) {
free(packet->data);
free(packet);
}
#define BS_SEND(fd, command, size) { \
if (bs_send_message(fd, command, size) < 0) \
return BS_STATUS_FAIL; \
}
#define BS_CHECK_MESSAGE(message) { \
if (!message) \
return BS_STATUS_FAIL; \
}
#define BS_RETURN_OK_WHEN(message, okstatus) { \
if (BS_STATUS_IS(message->status, okstatus)) { \
bs_free_message(message); \
return BS_STATUS_OK; \
} \
}
#define BS_RETURN_FAIL_WHEN(message, nokstatus, nokcode) { \
if (BS_STATUS_IS(message->status, nokstatus)) { \
bs_free_message(message); \
return nokcode; \
} \
}
#define BS_RETURN_INVALID(message) { \
bs_free_message(message); \
return BS_STATUS_FAIL; \
}
int bs_use(int fd, char *tube) {
BSM *message;
char command[1024];
snprintf(command, 1024, "use %s\r\n", tube);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_using);
BS_RETURN_INVALID(message);
}
int bs_watch(int fd, char *tube) {
BSM *message;
char command[1024];
snprintf(command, 1024, "watch %s\r\n", tube);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_watching);
BS_RETURN_INVALID(message);
}
int bs_ignore(int fd, char *tube) {
BSM *message;
char command[1024];
snprintf(command, 1024, "ignore %s\r\n", tube);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_watching);
BS_RETURN_INVALID(message);
}
int64_t bs_put(int fd, uint32_t priority, uint32_t delay, uint32_t ttr, char *data, size_t bytes) {
int64_t id;
BSMP *packet;
BSM *message;
char command[1024];
size_t command_bytes;
snprintf(command, 1024, "put %"PRIu32" %"PRIu32" %"PRIu32" %lu\r\n", priority, delay, ttr, bytes);
command_bytes = strlen(command);
packet = bs_message_packet_new(command_bytes + bytes + 3);
bs_message_packet_append(packet, command, command_bytes);
bs_message_packet_append(packet, data, bytes);
bs_message_packet_append(packet, "\r\n", 2);
// Can't use BS_SEND here, allocated memory needs to
// be cleared on error
int ret_code = bs_send_message(fd, packet->data, packet->offset);
bs_message_packet_free(packet);
if (ret_code <0) {
return BS_STATUS_FAIL;
}
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
if (BS_STATUS_IS(message->status, bs_resp_inserted)) {
id = strtoll(message->status + strlen(bs_resp_inserted) + 1, NULL, 10);
bs_free_message(message);
return id;
}
if (BS_STATUS_IS(message->status, bs_resp_buried)) {
id = strtoll(message->status + strlen(bs_resp_buried) + 1, NULL, 10);
bs_free_message(message);
return id;
}
BS_RETURN_FAIL_WHEN(message, bs_resp_expected_crlf, BS_STATUS_EXPECTED_CRLF);
BS_RETURN_FAIL_WHEN(message, bs_resp_job_too_big, BS_STATUS_JOB_TOO_BIG);
BS_RETURN_FAIL_WHEN(message, bs_resp_draining, BS_STATUS_DRAINING);
BS_RETURN_INVALID(message);
}
int bs_delete(int fd, int64_t job) {
BSM *message;
char command[512];
snprintf(command, 512, "delete %"PRId64"\r\n", job);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_deleted);
BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND);
BS_RETURN_INVALID(message);
}
int bs_reserve_job(int fd, char *command, BSJ **result) {
BSJ *job;
BSM *message;
// XXX: debug
// struct timeval start, end;
// gettimeofday(&start, 0);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_HAS_BODY);
BS_CHECK_MESSAGE(message);
if (BS_STATUS_IS(message->status, bs_resp_reserved)) {
*result = job = (BSJ*)malloc(sizeof(BSJ));
if (!job) {
bs_free_message(message);
return BS_STATUS_FAIL;
}
sscanf(message->status + strlen(bs_resp_reserved) + 1, "%"PRId64" %lu", &job->id, &job->size);
job->data = message->data;
message->data = 0;
bs_free_message(message);
// XXX: debug
// gettimeofday(&end, 0);
// printf("elapsed: %lu\n", (end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec));
return BS_STATUS_OK;
}
// i don't think we'll ever hit this status code here.
BS_RETURN_FAIL_WHEN(message, bs_resp_timed_out, BS_STATUS_TIMED_OUT);
BS_RETURN_FAIL_WHEN(message, bs_resp_deadline_soon, BS_STATUS_DEADLINE_SOON);
BS_RETURN_INVALID(message);
}
int bs_reserve(int fd, BSJ **result) {
char *command = "reserve\r\n";
return bs_reserve_job(fd, command, result);
}
int bs_reserve_with_timeout(int fd, uint32_t ttl, BSJ **result) {
char command[512];
snprintf(command, 512, "reserve-with-timeout %"PRIu32"\r\n", ttl);
return bs_reserve_job(fd, command, result);
}
int bs_release(int fd, int64_t id, uint32_t priority, uint32_t delay) {
BSM *message;
char command[512];
snprintf(command, 512, "release %"PRId64" %"PRIu32" %"PRIu32"\r\n", id, priority, delay);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_released);
BS_RETURN_FAIL_WHEN(message, bs_resp_buried, BS_STATUS_BURIED);
BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND);
BS_RETURN_INVALID(message);
}
int bs_bury(int fd, int64_t id, uint32_t priority) {
BSM *message;
char command[512];
snprintf(command, 512, "bury %"PRId64" %"PRIu32"\r\n", id, priority);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_buried);
BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND);
BS_RETURN_INVALID(message);
}
int bs_touch(int fd, int64_t id) {
BSM *message;
char command[512];
snprintf(command, 512, "touch %"PRId64"\r\n", id);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_touched);
BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND);
BS_RETURN_INVALID(message);
}
int bs_peek_job(int fd, char *command, BSJ **result) {
BSJ *job;
BSM *message;
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_HAS_BODY);
BS_CHECK_MESSAGE(message);
if (BS_STATUS_IS(message->status, bs_resp_found)) {
*result = job = (BSJ*)malloc(sizeof(BSJ));
if (!job) {
bs_free_message(message);
return BS_STATUS_FAIL;
}
sscanf(message->status + strlen(bs_resp_found) + 1, "%"PRId64" %lu", &job->id, &job->size);
job->data = message->data;
message->data = 0;
bs_free_message(message);
return BS_STATUS_OK;
}
BS_RETURN_FAIL_WHEN(message, bs_resp_not_found, BS_STATUS_NOT_FOUND);
BS_RETURN_INVALID(message);
}
int bs_peek(int fd, int64_t id, BSJ **job) {
char command[512];
snprintf(command, 512, "peek %"PRId64"\r\n", id);
return bs_peek_job(fd, command, job);
}
int bs_peek_ready(int fd, BSJ **job) {
return bs_peek_job(fd, "peek-ready\r\n", job);
}
int bs_peek_delayed(int fd, BSJ **job) {
return bs_peek_job(fd, "peek-delayed\r\n", job);
}
int bs_peek_buried(int fd, BSJ **job) {
return bs_peek_job(fd, "peek-buried\r\n", job);
}
int bs_kick(int fd, int bound) {
BSM *message;
char command[512];
snprintf(command, 512, "kick %d\r\n", bound);
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
BS_CHECK_MESSAGE(message);
BS_RETURN_OK_WHEN(message, bs_resp_kicked);
BS_RETURN_INVALID(message);
}
int bs_list_tube_used(int fd, char **tube) {
BSM *message;
char command[64];
snprintf(command, 64, "list-tube-used\r\n");
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_NO_BODY);
if (BS_STATUS_IS(message->status, bs_resp_using)) {
*tube = (char*)calloc(1, strlen(message->status) - strlen(bs_resp_using) + 1);
strcpy(*tube, message->status + strlen(bs_resp_using) + 1);
bs_free_message(message);
return BS_STATUS_OK;
}
BS_RETURN_INVALID(message);
}
int bs_get_info(int fd, char *command, char **yaml) {
BSM *message;
size_t size;
BS_SEND(fd, command, strlen(command));
message = bs_recv_message(fd, BS_MESSAGE_HAS_BODY);
BS_CHECK_MESSAGE(message);
if (BS_STATUS_IS(message->status, bs_resp_ok)) {
sscanf(message->status + strlen(bs_resp_ok) + 1, "%lu", &size);
*yaml = message->data;
(*yaml)[size] = 0;
message->data = 0;
bs_free_message(message);
return BS_STATUS_OK;
}
BS_RETURN_INVALID(message);
}
int bs_list_tubes(int fd, char **yaml) {
char command[64];
snprintf(command, 64, "list-tubes\r\n");
return bs_get_info(fd, command, yaml);
}
int bs_list_tubes_watched(int fd, char **yaml) {
char command[64];
snprintf(command, 64, "list-tubes-watched\r\n");
return bs_get_info(fd, command, yaml);
}
int bs_stats(int fd, char **yaml) {
char command[64];
snprintf(command, 64, "stats\r\n");
return bs_get_info(fd, command, yaml);
}
int bs_stats_job(int fd, int64_t id, char **yaml) {
char command[128];
snprintf(command, 128, "stats-job %"PRId64"\r\n", id);
return bs_get_info(fd, command, yaml);
}
int bs_stats_tube(int fd, char *tube, char **yaml) {
char command[512];
snprintf(command, 512, "stats-tube %s\r\n", tube);
return bs_get_info(fd, command, yaml);
}
void bs_version(int *major, int *minor, int *patch)
{
*major = BS_MAJOR_VERSION;
*minor = BS_MINOR_VERSION;
*patch = BS_PATCH_VERSION;
}

317
src/beanstalk.cc Normal file
View File

@@ -0,0 +1,317 @@
#include "beanstalk.hpp"
#include <stdexcept>
#include <sstream>
#include <iostream>
using namespace std;
namespace Beanstalk {
Job::Job() {
_id = 0;
}
Job::Job(int64_t id, char *data, size_t size) {
_body.assign(data, size);
_id = id;
}
Job::Job(BSJ *job) {
if (job) {
_body.assign(job->data, job->size);
_id = job->id;
bs_free_job(job);
}
else {
_id = 0;
}
}
string& Job::body() {
return _body;
}
int64_t Job::id() {
return _id;
}
/* start helpers */
void parsedict(stringstream &stream, info_hash_t &dict) {
string key, value;
while(true) {
stream >> key;
if (stream.eof()) break;
if (key[0] == '-') continue;
stream >> value;
key.erase(--key.end());
dict[key] = value;
}
}
void parselist(stringstream &stream, info_list_t &list) {
string value;
while(true) {
stream >> value;
if (stream.eof()) break;
if (value[0] == '-') continue;
list.push_back(value);
}
}
/* end helpers */
Client::~Client() {
if (handle > 0)
bs_disconnect(handle);
handle = -1;
}
Client::Client() {
handle = -1;
host = "";
port = 0;
timeout_secs = 0;
}
Client::Client(string host, int port, float secs) {
handle = -1;
timeout_secs = secs;
connect(host, port, timeout_secs);
}
void Client::connect(string _host, int _port, float secs) {
if (handle > 0)
throw runtime_error("already connected to beanstalkd at " + host);
host = _host;
port = _port;
timeout_secs = secs;
handle = secs > 0 ? bs_connect_with_timeout((char *)host.c_str(), port, secs) : bs_connect((char*)host.c_str(), port);
if (handle < 0)
throw runtime_error("unable to connect to beanstalkd at " + host);
}
bool Client::is_connected() {
return handle > 0;
}
bool Client::disconnect() {
if (handle > 0 && bs_disconnect(handle) == BS_STATUS_OK) {
handle = -1;
return true;
}
return false;
}
void Client::version(int *major, int *minor, int *patch) {
bs_version(major, minor, patch);
}
void Client::reconnect() {
disconnect();
connect(host, port, timeout_secs);
}
bool Client::use(string tube) {
return bs_use(handle, (char*)tube.c_str()) == BS_STATUS_OK;
}
bool Client::watch(string tube) {
return bs_watch(handle, (char*)tube.c_str()) == BS_STATUS_OK;
}
bool Client::ignore(string tube) {
return bs_ignore(handle, (char*)tube.c_str()) == BS_STATUS_OK;
}
int64_t Client::put(string body, uint32_t priority, uint32_t delay, uint32_t ttr) {
int64_t id = bs_put(handle, priority, delay, ttr, (char*)body.data(), body.size());
return (id > 0 ? id : 0);
}
int64_t Client::put(char *body, size_t bytes, uint32_t priority, uint32_t delay, uint32_t ttr) {
int64_t id = bs_put(handle, priority, delay, ttr, body, bytes);
return (id > 0 ? id : 0);
}
bool Client::del(Job &job) {
return bs_delete(handle, job.id()) == BS_STATUS_OK;
}
bool Client::del(int64_t id) {
return bs_delete(handle, id) == BS_STATUS_OK;
}
bool Client::reserve(Job &job) {
BSJ *bsj;
if (bs_reserve(handle, &bsj) == BS_STATUS_OK) {
job = bsj;
return true;
}
return false;
}
bool Client::reserve(Job &job, uint32_t timeout) {
BSJ *bsj;
if (bs_reserve_with_timeout(handle, timeout, &bsj) == BS_STATUS_OK) {
job = bsj;
return true;
}
return false;
}
bool Client::release(Job &job, uint32_t priority, uint32_t delay) {
return bs_release(handle, job.id(), priority, delay) == BS_STATUS_OK;
}
bool Client::release(int64_t id, uint32_t priority, uint32_t delay) {
return bs_release(handle, id, priority, delay) == BS_STATUS_OK;
}
bool Client::bury(Job &job, uint32_t priority) {
return bs_bury(handle, job.id(), priority) == BS_STATUS_OK;
}
bool Client::bury(int64_t id, uint32_t priority) {
return bs_bury(handle, id, priority) == BS_STATUS_OK;
}
bool Client::touch(Job &job) {
return bs_touch(handle, job.id()) == BS_STATUS_OK;
}
bool Client::touch(int64_t id) {
return bs_touch(handle, id) == BS_STATUS_OK;
}
bool Client::peek(Job &job, int64_t id) {
BSJ *bsj;
if (bs_peek(handle, id, &bsj) == BS_STATUS_OK) {
job = bsj;
return true;
}
return false;
}
bool Client::peek_ready(Job &job) {
BSJ *bsj;
if (bs_peek_ready(handle, &bsj) == BS_STATUS_OK) {
job = bsj;
return true;
}
return false;
}
bool Client::peek_delayed(Job &job) {
BSJ *bsj;
if (bs_peek_delayed(handle, &bsj) == BS_STATUS_OK) {
job = bsj;
return true;
}
return false;
}
bool Client::peek_buried(Job &job) {
BSJ *bsj;
if (bs_peek_buried(handle, &bsj) == BS_STATUS_OK) {
job = bsj;
return true;
}
return false;
}
bool Client::kick(int bound) {
return bs_kick(handle, bound) == BS_STATUS_OK;
}
string Client::list_tube_used() {
char *name;
string tube;
if (bs_list_tube_used(handle, &name) == BS_STATUS_OK) {
tube.assign(name);
free(name);
}
return tube;
}
info_list_t Client::list_tubes() {
char *yaml, *data;
info_list_t tubes;
if (bs_list_tubes(handle, &yaml) == BS_STATUS_OK) {
if ((data = strstr(yaml, "---"))) {
stringstream stream(data);
parselist(stream, tubes);
}
free(yaml);
}
return tubes;
}
info_list_t Client::list_tubes_watched() {
char *yaml, *data;
info_list_t tubes;
if (bs_list_tubes_watched(handle, &yaml) == BS_STATUS_OK) {
if ((data = strstr(yaml, "---"))) {
stringstream stream(data);
parselist(stream, tubes);
}
free(yaml);
}
return tubes;
}
info_hash_t Client::stats() {
char *yaml, *data;
info_hash_t stats;
string key, value;
if (bs_stats(handle, &yaml) == BS_STATUS_OK) {
if ((data = strstr(yaml, "---"))) {
stringstream stream(data);
parsedict(stream, stats);
}
free(yaml);
}
return stats;
}
info_hash_t Client::stats_job(int64_t id) {
char *yaml, *data;
info_hash_t stats;
string key, value;
if (bs_stats_job(handle, id, &yaml) == BS_STATUS_OK) {
if ((data = strstr(yaml, "---"))) {
stringstream stream(data);
parsedict(stream, stats);
}
free(yaml);
}
return stats;
}
info_hash_t Client::stats_tube(string name) {
char *yaml, *data;
info_hash_t stats;
string key, value;
if (bs_stats_tube(handle, (char*)name.c_str(), &yaml) == BS_STATUS_OK) {
if ((data = strstr(yaml, "---"))) {
stringstream stream(data);
parsedict(stream, stats);
}
free(yaml);
}
return stats;
}
bool Client::ping() {
char *yaml;
if (bs_list_tubes(handle, &yaml) == BS_STATUS_OK) {
free(yaml);
return true;
}
return false;
}
}

114
src/beanstalk.h Normal file
View File

@@ -0,0 +1,114 @@
#pragma once
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/fcntl.h>
#define BS_MAJOR_VERSION 1
#define BS_MINOR_VERSION 2
#define BS_PATCH_VERSION 0
#define BS_STATUS_OK 0
#define BS_STATUS_FAIL -1
#define BS_STATUS_EXPECTED_CRLF -2
#define BS_STATUS_JOB_TOO_BIG -3
#define BS_STATUS_DRAINING -4
#define BS_STATUS_TIMED_OUT -5
#define BS_STATUS_NOT_FOUND -6
#define BS_STATUS_DEADLINE_SOON -7
#define BS_STATUS_BURIED -8
#define BS_STATUS_NOT_IGNORED -9
#ifdef __cplusplus
extern "C" {
#endif
typedef struct bs_message {
char *data;
char *status;
size_t size;
} BSM;
typedef struct bs_job {
int64_t id;
char *data;
size_t size;
} BSJ;
// optional polling call, returns 1 if the socket is ready of the rw operation specified.
// rw: 1 => read, 2 => write, 3 => read/write
// fd: file descriptor of the socket
typedef int (*bs_poll_function)(int rw, int fd);
/* Handle DSO symbol visibility - Stolen from zmq.h */
#if defined _WIN32
# if defined DLL_EXPORT
# define BSC_EXPORT __declspec(dllexport)
# else
# define BSC_EXPORT __declspec(dllimport)
# endif
#else
# if defined __SUNPRO_C || defined __SUNPRO_CC
# define BSC_EXPORT __global
# elif (defined __GNUC__ && __GNUC__ >= 4) || defined __INTEL_COMPILER
# define BSC_EXPORT __attribute__ ((visibility("default")))
# else
# define BSC_EXPORT
# endif
#endif
// export version
BSC_EXPORT void bs_version(int *major, int *minor, int *patch);
// polling setup
BSC_EXPORT void bs_start_polling(bs_poll_function f);
BSC_EXPORT void bs_reset_polling(void);
// returns a descriptive text of the error code.
BSC_EXPORT const char* bs_status_text(int code);
BSC_EXPORT void bs_free_message(BSM* m);
BSC_EXPORT void bs_free_job(BSJ *job);
// returns socket descriptor or BS_STATUS_FAIL
BSC_EXPORT int bs_connect(char *host, int port);
BSC_EXPORT int bs_connect_with_timeout(char *host, int port, float secs);
// returns job id or one of the negative failure codes.
BSC_EXPORT int64_t bs_put(int fd, uint32_t priority, uint32_t delay, uint32_t ttr, char *data, size_t bytes);
// rest return BS_STATUS_OK or one of the failure codes.
BSC_EXPORT int bs_disconnect(int fd);
BSC_EXPORT int bs_use(int fd, char *tube);
BSC_EXPORT int bs_watch(int fd, char *tube);
BSC_EXPORT int bs_ignore(int fd, char *tube);
BSC_EXPORT int bs_delete(int fd, int64_t job);
BSC_EXPORT int bs_reserve(int fd, BSJ **job);
BSC_EXPORT int bs_reserve_with_timeout(int fd, uint32_t ttl, BSJ **job);
BSC_EXPORT int bs_release(int fd, int64_t id, uint32_t priority, uint32_t delay);
BSC_EXPORT int bs_bury(int fd, int64_t id, uint32_t priority);
BSC_EXPORT int bs_touch(int fd, int64_t id);
BSC_EXPORT int bs_peek(int fd, int64_t id, BSJ **job);
BSC_EXPORT int bs_peek_ready(int fd, BSJ **job);
BSC_EXPORT int bs_peek_delayed(int fd, BSJ **job);
BSC_EXPORT int bs_peek_buried(int fd, BSJ **job);
BSC_EXPORT int bs_kick(int fd, int bound);
BSC_EXPORT int bs_list_tube_used(int fd, char **tube);
BSC_EXPORT int bs_list_tubes(int fd, char **yaml);
BSC_EXPORT int bs_list_tubes_watched(int fd, char **yaml);
BSC_EXPORT int bs_stats(int fd, char **yaml);
BSC_EXPORT int bs_stats_job(int fd, int64_t id, char **yaml);
BSC_EXPORT int bs_stats_tube(int fd, char *tube, char **yaml);
#ifdef __cplusplus
}
#endif

67
src/beanstalk.hpp Normal file
View File

@@ -0,0 +1,67 @@
#pragma once
#include "beanstalk.h"
#include <string>
#include <vector>
#include <map>
namespace Beanstalk {
typedef std::vector<std::string> info_list_t;
typedef std::map<std::string, std::string> info_hash_t;
class Job {
public:
int64_t id();
std::string& body();
Job(int64_t, char*, size_t);
Job(BSJ*);
Job();
operator bool() { return _id > 0; }
protected:
int64_t _id;
std::string _body;
};
class Client {
public:
~Client();
Client();
Client(std::string host, int port, float timeout_secs = 0);
bool ping();
bool use(std::string);
bool watch(std::string);
bool ignore(std::string);
int64_t put(std::string, uint32_t priority = 0, uint32_t delay = 0, uint32_t ttr = 60);
int64_t put(char *data, size_t bytes, uint32_t priority, uint32_t delay, uint32_t ttr);
bool del(int64_t id);
bool del(Job&);
bool reserve(Job &);
bool reserve(Job &, uint32_t timeout);
bool release(Job &, uint32_t priority = 1, uint32_t delay = 0);
bool release(int64_t id, uint32_t priority = 1, uint32_t delay = 0);
bool bury(Job &, uint32_t priority = 1);
bool bury(int64_t id, uint32_t priority = 1);
bool touch(Job &);
bool touch(int64_t id);
bool peek(Job &, int64_t id);
bool peek_ready(Job &);
bool peek_delayed(Job &);
bool peek_buried(Job &);
bool kick(int bound);
void connect(std::string host, int port, float timeout_secs = 0);
void reconnect();
bool disconnect();
void version(int *major, int *minor, int *patch);
bool is_connected();
std::string list_tube_used();
info_list_t list_tubes();
info_list_t list_tubes_watched();
info_hash_t stats();
info_hash_t stats_job(int64_t);
info_hash_t stats_tube(std::string);
protected:
float timeout_secs;
int handle, port;
std::string host;
};
}

127
src/daemon.cpp Normal file
View File

@@ -0,0 +1,127 @@
#include <unistd.h>
#include "beanstalk.hpp"
#include "alpr.h"
#include "openalpr/simpleini/simpleini.h"
#include "support/tinythread.h"
#include "videobuffer.h"
// prototypes
void streamRecognitionThread(void* arg);
struct ThreadData
{
std::string stream_url;
};
bool daemon_active;
int main( int argc, const char** argv )
{
daemon_active = true;
CSimpleIniA ini;
ini.SetMultiKey();
ini.LoadFile("/etc/openalpr/wts.conf");
std::vector<std::string> stream_urls;
CSimpleIniA::TNamesDepend values;
ini.GetAllValues("daemon", "stream", values);
// sort the values into the original load order
values.sort(CSimpleIniA::Entry::LoadOrder());
// output all of the items
CSimpleIniA::TNamesDepend::const_iterator i;
for (i = values.begin(); i != values.end(); ++i) {
stream_urls.push_back(i->pItem);
}
if (stream_urls.size() == 0)
{
std::cout << "No video streams defined in the configuration" << std::endl;
return 1;
}
for (int i = 0; i < stream_urls.size(); i++)
{
ThreadData* tdata = new ThreadData();
tdata->stream_url = stream_urls[i];
tthread::thread* t = new tthread::thread(streamRecognitionThread, (void*) tdata);
}
}
void streamRecognitionThread(void* arg)
{
ThreadData* tdata = (ThreadData*) arg;
std::cout << "Stream: " << tdata->stream_url << std::endl;
int framenum = 0;
VideoBuffer videoBuffer;
videoBuffer.connect(tdata->stream_url, 5);
cv::Mat latestFrame;
/*
while (daemon_active)
{
int response = videoBuffer.getLatestFrame(&latestFrame);
if (response != -1)
{
detectandshow( &alpr, latestFrame, "", outputJson);
}
//cv::waitKey(10);
}
*/
videoBuffer.disconnect();
std::cout << "Video processing ended" << std::endl;
delete tdata;
}
bool writeToQueue(AlprResult result)
{
Beanstalk::Client client("127.0.0.1", 11300);
client.use("test");
client.watch("test");
int id = client.put("hello");
if (id <= 0)
return 1;
std::cout << "put job id: " << id << std::endl;
Beanstalk::Job job;
client.reserve(job);
if (job.id() != id)
return 1;
std::cout << "reserved job id: "
<< job.id()
<< " with body {" << job.body() << "}"
<< std::endl;
client.del(job.id());
std::cout << "deleted job id: " << job.id() << std::endl;
}