Refactoring in prep for mirroring

This commit is contained in:
Adam Ierymenko 2019-08-06 07:51:50 -05:00
parent f4f8fef82e
commit 37d508ab96
No known key found for this signature in database
GPG key ID: 1657198823E52A61
10 changed files with 161 additions and 152 deletions

View file

@ -313,7 +313,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
if (initialized) { if (initialized) {
std::lock_guard<std::mutex> ll(_changeListeners_l); std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig); (*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
} }
} }
} else if (memberId) { } else if (memberId) {
@ -336,7 +336,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) { if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
std::lock_guard<std::mutex> ll(_changeListeners_l); std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberDeauthorize(networkId,memberId); (*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
} }
} }
} }
@ -362,7 +362,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
if (initialized) { if (initialized) {
std::lock_guard<std::mutex> ll(_changeListeners_l); std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkUpdate(networkId,networkConfig); (*i)->onNetworkUpdate(this,networkId,networkConfig);
} }
} }
} }

View file

@ -58,10 +58,10 @@ public:
public: public:
ChangeListener() {} ChangeListener() {}
virtual ~ChangeListener() {} virtual ~ChangeListener() {}
virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {} virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) {}
virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {} virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) {}
virtual void onNetworkMemberOnline(uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {} virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {}
}; };
struct NetworkSummaryInfo struct NetworkSummaryInfo
@ -95,12 +95,15 @@ public:
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
bool summary(const uint64_t networkId,NetworkSummaryInfo &info); bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
void networks(std::vector<uint64_t> &networks); void networks(std::vector<uint64_t> &networks);
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0; virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0; virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0; virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
inline void addListener(DB::ChangeListener *const listener) inline void addListener(DB::ChangeListener *const listener)

View file

@ -1190,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
} }
} }
void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network) void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
{ {
// Send an update to all members of the network that are online // Send an update to all members of the network that are online
const int64_t now = OSUtils::now(); const int64_t now = OSUtils::now();
@ -1201,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const n
} }
} }
void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member) void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{ {
// Push update to member if online // Push update to member if online
try { try {
@ -1212,7 +1212,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,c
} catch ( ... ) {} } catch ( ... ) {}
} }
void EmbeddedNetworkController::onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId) void EmbeddedNetworkController::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
{ {
const int64_t now = OSUtils::now(); const int64_t now = OSUtils::now();
Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM); Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM);

View file

@ -101,9 +101,9 @@ public:
void handleRemoteTrace(const ZT_RemoteTrace &rt); void handleRemoteTrace(const ZT_RemoteTrace &rt);
virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network); virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member); virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId); virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
private: private:
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData); void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);

View file

@ -178,7 +178,7 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const
{ {
std::lock_guard<std::mutex> l2(_changeListeners_l); std::lock_guard<std::mutex> l2(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
(*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress); (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
} }
} }

View file

@ -404,7 +404,7 @@ void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const I
{ {
std::lock_guard<std::mutex> l2(_changeListeners_l); std::lock_guard<std::mutex> l2(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
(*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress); (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
} }
} }

View file

@ -24,9 +24,11 @@
* of your own application. * of your own application.
*/ */
#include "PostgreSQL.hpp"
#ifdef ZT_CONTROLLER_USE_LIBPQ #ifdef ZT_CONTROLLER_USE_LIBPQ
#include "PostgreSQL.hpp" #include "../node/Constants.hpp"
#include "EmbeddedNetworkController.hpp" #include "EmbeddedNetworkController.hpp"
#include "RabbitMQ.hpp" #include "RabbitMQ.hpp"
#include "../version.h" #include "../version.h"
@ -37,6 +39,7 @@
#include <amqp_tcp_socket.h> #include <amqp_tcp_socket.h>
using json = nlohmann::json; using json = nlohmann::json;
namespace { namespace {
static const int DB_MINIMUM_VERSION = 5; static const int DB_MINIMUM_VERSION = 5;
@ -73,16 +76,16 @@ std::string join(const std::vector<std::string> &elements, const char * const se
} }
} }
} } // anonymous namespace
using namespace ZeroTier; using namespace ZeroTier;
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc) PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(myId, path) : DB(myId, path)
, _ready(0) , _ready(0)
, _connected(1) , _connected(1)
, _run(1) , _run(1)
, _waitNoticePrinted(false) , _waitNoticePrinted(false)
, _listenPort(listenPort) , _listenPort(listenPort)
, _mqc(mqc) , _mqc(mqc)
{ {
@ -221,7 +224,7 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId,
{ {
std::lock_guard<std::mutex> l2(_changeListeners_l); std::lock_guard<std::mutex> l2(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
(*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress); (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
} }
} }
@ -602,8 +605,8 @@ void PostgreSQL::heartbeat()
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
"use_rabbitmq = EXCLUDED.use_rabbitmq", "use_rabbitmq = EXCLUDED.use_rabbitmq",
10, // number of parameters 10, // number of parameters
NULL, // oid field. ignore NULL, // oid field. ignore
values, // values for substitution values, // values for substitution
NULL, // lengths in bytes of each value NULL, // lengths in bytes of each value
NULL, // binary? NULL, // binary?
@ -724,7 +727,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
} catch(...) { } catch(...) {
fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n"); fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
} }
} }
} }
@ -1324,7 +1327,7 @@ void PostgreSQL::onlineNotificationThread()
int64_t lastUpdatedNetworkStatus = 0; int64_t lastUpdatedNetworkStatus = 0;
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
while (_run == 1) { while (_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) { if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
@ -1438,7 +1441,8 @@ void PostgreSQL::onlineNotificationThread()
} }
} }
PGconn *PostgreSQL::getPgConn(OverrideMode m) { PGconn *PostgreSQL::getPgConn(OverrideMode m)
{
if (m == ALLOW_PGBOUNCER_OVERRIDE) { if (m == ALLOW_PGBOUNCER_OVERRIDE) {
char *connStr = getenv("PGBOUNCER_CONNSTR"); char *connStr = getenv("PGBOUNCER_CONNSTR");
if (connStr != NULL) { if (connStr != NULL) {
@ -1452,4 +1456,5 @@ PGconn *PostgreSQL::getPgConn(OverrideMode m) {
return PQconnectdb(_connString.c_str()); return PQconnectdb(_connString.c_str());
} }
#endif //ZT_CONTROLLER_USE_LIBPQ #endif //ZT_CONTROLLER_USE_LIBPQ

View file

@ -24,6 +24,8 @@
* of your own application. * of your own application.
*/ */
#define ZT_CONTROLLER_USE_LIBPQ
#ifdef ZT_CONTROLLER_USE_LIBPQ #ifdef ZT_CONTROLLER_USE_LIBPQ
#ifndef ZT_CONTROLLER_LIBPQ_HPP #ifndef ZT_CONTROLLER_LIBPQ_HPP
@ -34,11 +36,10 @@
#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4 #define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
extern "C" { extern "C" {
typedef struct pg_conn PGconn; typedef struct pg_conn PGconn;
} }
namespace ZeroTier namespace ZeroTier {
{
struct MQConfig; struct MQConfig;
@ -51,66 +52,66 @@ struct MQConfig;
class PostgreSQL : public DB class PostgreSQL : public DB
{ {
public: public:
PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL(); virtual ~PostgreSQL();
virtual bool waitForReady(); virtual bool waitForReady();
virtual bool isReady(); virtual bool isReady();
virtual void save(nlohmann::json *orig, nlohmann::json &record); virtual void save(nlohmann::json *orig, nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId); virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId); virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress); virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
protected: protected:
struct _PairHasher struct _PairHasher
{ {
inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); } inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); }
}; };
private: private:
void initializeNetworks(PGconn *conn); void initializeNetworks(PGconn *conn);
void initializeMembers(PGconn *conn); void initializeMembers(PGconn *conn);
void heartbeat(); void heartbeat();
void membersDbWatcher(); void membersDbWatcher();
void _membersWatcher_Postgres(PGconn *conn); void _membersWatcher_Postgres(PGconn *conn);
void _membersWatcher_RabbitMQ(); void _membersWatcher_RabbitMQ();
void networksDbWatcher(); void networksDbWatcher();
void _networksWatcher_Postgres(PGconn *conn); void _networksWatcher_Postgres(PGconn *conn);
void _networksWatcher_RabbitMQ(); void _networksWatcher_RabbitMQ();
void commitThread(); void commitThread();
void onlineNotificationThread(); void onlineNotificationThread();
enum OverrideMode { enum OverrideMode {
ALLOW_PGBOUNCER_OVERRIDE = 0, ALLOW_PGBOUNCER_OVERRIDE = 0,
NO_OVERRIDE = 1 NO_OVERRIDE = 1
}; };
PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE ); PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
std::string _connString; std::string _connString;
BlockingQueue<nlohmann::json *> _commitQueue; BlockingQueue<nlohmann::json *> _commitQueue;
std::thread _heartbeatThread; std::thread _heartbeatThread;
std::thread _membersDbWatcher; std::thread _membersDbWatcher;
std::thread _networksDbWatcher; std::thread _networksDbWatcher;
std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS]; std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
std::thread _onlineNotificationThread; std::thread _onlineNotificationThread;
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline; std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline;
mutable std::mutex _lastOnline_l; mutable std::mutex _lastOnline_l;
mutable std::mutex _readyLock; mutable std::mutex _readyLock;
std::atomic<int> _ready, _connected, _run; std::atomic<int> _ready, _connected, _run;
mutable volatile bool _waitNoticePrinted; mutable volatile bool _waitNoticePrinted;
int _listenPort; int _listenPort;
MQConfig *_mqc; MQConfig *_mqc;
}; };
} } // namespace ZeroTier
#endif // ZT_CONTROLLER_LIBPQ_HPP #endif // ZT_CONTROLLER_LIBPQ_HPP

View file

@ -11,95 +11,95 @@ namespace ZeroTier
{ {
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
: _mqc(cfg) : _mqc(cfg)
, _qName(queueName) , _qName(queueName)
, _socket(NULL) , _socket(NULL)
, _status(0) , _status(0)
{ {
} }
RabbitMQ::~RabbitMQ() RabbitMQ::~RabbitMQ()
{ {
amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(_conn); amqp_destroy_connection(_conn);
} }
void RabbitMQ::init() void RabbitMQ::init()
{ {
struct timeval tval; struct timeval tval;
memset(&tval, 0, sizeof(struct timeval)); memset(&tval, 0, sizeof(struct timeval));
tval.tv_sec = 5; tval.tv_sec = 5;
fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
_conn = amqp_new_connection(); _conn = amqp_new_connection();
_socket = amqp_tcp_socket_new(_conn); _socket = amqp_tcp_socket_new(_conn);
if (!_socket) { if (!_socket) {
throw std::runtime_error("Can't create socket for RabbitMQ"); throw std::runtime_error("Can't create socket for RabbitMQ");
} }
_status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
if (_status) { if (_status) {
throw std::runtime_error("Can't connect to RabbitMQ"); throw std::runtime_error("Can't connect to RabbitMQ");
} }
amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
_mqc->username, _mqc->password); _mqc->username, _mqc->password);
if (r.reply_type != AMQP_RESPONSE_NORMAL) { if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("RabbitMQ Login Error"); throw std::runtime_error("RabbitMQ Login Error");
} }
static int chan = 0; static int chan = 0;
{ {
Mutex::Lock l(_chan_m); Mutex::Lock l(_chan_m);
_channel = ++chan; _channel = ++chan;
}
amqp_channel_open(_conn, _channel);
r = amqp_get_rpc_reply(_conn);
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error opening communication channel");
}
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error declaring queue " + std::string(_qName));
} }
amqp_channel_open(_conn, _channel);
r = amqp_get_rpc_reply(_conn);
if(r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error opening communication channel");
}
_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error declaring queue " + std::string(_qName));
}
amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
r = amqp_get_rpc_reply(_conn); r = amqp_get_rpc_reply(_conn);
if (r.reply_type != AMQP_RESPONSE_NORMAL) { if (r.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Error consuming queue " + std::string(_qName)); throw std::runtime_error("Error consuming queue " + std::string(_qName));
} }
fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
} }
std::string RabbitMQ::consume() std::string RabbitMQ::consume()
{ {
amqp_rpc_reply_t res; amqp_rpc_reply_t res;
amqp_envelope_t envelope; amqp_envelope_t envelope;
amqp_maybe_release_buffers(_conn); amqp_maybe_release_buffers(_conn);
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 1; timeout.tv_sec = 1;
timeout.tv_usec = 0; timeout.tv_usec = 0;
res = amqp_consume_message(_conn, &envelope, &timeout, 0); res = amqp_consume_message(_conn, &envelope, &timeout, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) { if (res.reply_type != AMQP_RESPONSE_NORMAL) {
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
// timeout waiting for message. Return empty string // timeout waiting for message. Return empty string
return ""; return "";
} else { } else {
throw std::runtime_error("Error getting message"); throw std::runtime_error("Error getting message");
} }
} }
std::string msg( std::string msg(
(const char*)envelope.message.body.bytes, (const char*)envelope.message.body.bytes,
envelope.message.body.len envelope.message.body.len
); );
amqp_destroy_envelope(&envelope); amqp_destroy_envelope(&envelope);
return msg; return msg;
} }
} }

View file

@ -23,16 +23,17 @@
* directly against ZeroTier software without disclosing the source code * directly against ZeroTier software without disclosing the source code
* of your own application. * of your own application.
*/ */
#ifndef ZT_CONTROLLER_RABBITMQ_HPP #ifndef ZT_CONTROLLER_RABBITMQ_HPP
#define ZT_CONTROLLER_RABBITMQ_HPP #define ZT_CONTROLLER_RABBITMQ_HPP
namespace ZeroTier namespace ZeroTier
{ {
struct MQConfig { struct MQConfig {
const char *host; const char *host;
int port; int port;
const char *username; const char *username;
const char *password; const char *password;
}; };
} }
@ -49,26 +50,25 @@ namespace ZeroTier
class RabbitMQ { class RabbitMQ {
public: public:
RabbitMQ(MQConfig *cfg, const char *queueName); RabbitMQ(MQConfig *cfg, const char *queueName);
~RabbitMQ(); ~RabbitMQ();
void init(); void init();
std::string consume(); std::string consume();
private: private:
MQConfig *_mqc; MQConfig *_mqc;
const char *_qName; const char *_qName;
amqp_socket_t *_socket; amqp_socket_t *_socket;
amqp_connection_state_t _conn; amqp_connection_state_t _conn;
amqp_queue_declare_ok_t *_q; amqp_queue_declare_ok_t *_q;
int _status; int _status;
int _channel; int _channel;
Mutex _chan_m; Mutex _chan_m;
}; };
} }