added struct for rabbitmq config

This commit is contained in:
Grant Limberg 2019-03-04 17:01:16 -08:00
parent 801401a6e7
commit 1f13374a4f
2 changed files with 57 additions and 14 deletions

View file

@ -74,13 +74,14 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier; using namespace ZeroTier;
PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort) PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc)
: DB(nc, myId, path) : DB(nc, myId, path)
, _ready(0) , _ready(0)
, _connected(1) , _connected(1)
, _run(1) , _run(1)
, _waitNoticePrinted(false) , _waitNoticePrinted(false)
, _listenPort(listenPort) , _listenPort(listenPort)
, _mqc(mqc)
{ {
_connString = std::string(path) + " application_name=controller_" +_myAddressStr; _connString = std::string(path) + " application_name=controller_" +_myAddressStr;
@ -601,6 +602,21 @@ void PostgreSQL::membersDbWatcher()
PQclear(res); res = NULL; PQclear(res); res = NULL;
if (this->_mqc != NULL) {
_membersWatcher_RabbitMQ();
} else {
_membersWatcher_Postgres(conn);
PQfinish(conn);
conn = NULL;
}
if (_run == 1) {
fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
exit(9);
}
}
void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
while(_run == 1) { while(_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) { if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres.");
@ -627,12 +643,10 @@ void PostgreSQL::membersDbWatcher()
} }
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
PQfinish(conn); }
conn = NULL;
if (_run == 1) { void PostgreSQL::_membersWatcher_RabbitMQ() {
fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
exit(9);
}
} }
void PostgreSQL::networksDbWatcher() void PostgreSQL::networksDbWatcher()
@ -658,6 +672,21 @@ void PostgreSQL::networksDbWatcher()
PQclear(res); res = NULL; PQclear(res); res = NULL;
if (this->_mqc != NULL) {
_networksWatcher_RabbitMQ();
} else {
_networksWatcher_Postgres(conn);
PQfinish(conn);
conn = NULL;
}
if (_run == 1) {
fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
exit(8);
}
}
void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
while(_run == 1) { while(_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) { if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres.");
@ -682,12 +711,10 @@ void PostgreSQL::networksDbWatcher()
} }
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
PQfinish(conn); }
conn = NULL;
if (_run == 1) { void PostgreSQL::_networksWatcher_RabbitMQ() {
fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
exit(8);
}
} }
void PostgreSQL::commitThread() void PostgreSQL::commitThread()

View file

@ -40,6 +40,14 @@ extern "C" {
namespace ZeroTier namespace ZeroTier
{ {
struct mq_config
{
const char *host;
int port;
const char *username;
const char *password;
};
/** /**
* A controller database driver that talks to PostgreSQL * A controller database driver that talks to PostgreSQL
* *
@ -49,7 +57,7 @@ namespace ZeroTier
class PostgreSQL : public DB class PostgreSQL : public DB
{ {
public: public:
PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort); PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL);
virtual ~PostgreSQL(); virtual ~PostgreSQL();
virtual bool waitForReady(); virtual bool waitForReady();
@ -70,7 +78,13 @@ private:
void initializeMembers(PGconn *conn); void initializeMembers(PGconn *conn);
void heartbeat(); void heartbeat();
void membersDbWatcher(); void membersDbWatcher();
void _membersWatcher_Postgres(PGconn *conn);
void _membersWatcher_RabbitMQ();
void networksDbWatcher(); void networksDbWatcher();
void _networksWatcher_Postgres(PGconn *conn);
void _networksWatcher_RabbitMQ();
void commitThread(); void commitThread();
void onlineNotificationThread(); void onlineNotificationThread();
@ -100,6 +114,8 @@ private:
mutable volatile bool _waitNoticePrinted; mutable volatile bool _waitNoticePrinted;
int _listenPort; int _listenPort;
mq_config *_mqc;
}; };
} }