diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index fb3867cac..356166fb7 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -735,50 +735,54 @@ void PostgreSQL::_membersWatcher_Redis() { std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; while (_run == 1) { - json tmp; - std::unordered_map result; - if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } - if (!result.empty()) { - for (auto element : result) { -#ifdef ZT_TRACE - fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); -#endif - for (auto rec : element.second) { - std::string id = rec.first; - auto attrs = rec.second; -#ifdef ZT_TRACE - fprintf(stdout, "Record ID: %s\n", id.c_str()); - fprintf(stdout, "attrs len: %lu\n", attrs.size()); -#endif - for (auto a : attrs) { -#ifdef ZT_TRACE - fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); -#endif - try { - tmp = json::parse(a.second); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _memberChanged(oldConfig,newConfig,(this->_ready >= 2)); + try { + json tmp; + std::unordered_map result; + if (_rc->clusterMode) { + _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } else { + _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } + if (!result.empty()) { + for (auto element : result) { + #ifdef ZT_TRACE + fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); + #endif + for (auto rec : element.second) { + std::string id = rec.first; + auto attrs = rec.second; + #ifdef ZT_TRACE + fprintf(stdout, "Record ID: %s\n", id.c_str()); + fprintf(stdout, "attrs len: %lu\n", attrs.size()); + #endif + for (auto a : attrs) { + #ifdef ZT_TRACE + fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); + #endif + try { + tmp = json::parse(a.second); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) { + fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } catch (...) { - fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } - if (_rc->clusterMode) { - _cluster->xdel(key, id); - } else { - _redis->xdel(key, id); + if (_rc->clusterMode) { + _cluster->xdel(key, id); + } else { + _redis->xdel(key, id); + } } } } + } catch (sw::redis::Error &e) { + fprintf(stderr, "Error in Redis members watcher: %s\n", e.what()); } } fprintf(stderr, "membersWatcher ended\n"); @@ -856,51 +860,55 @@ void PostgreSQL::_networksWatcher_Redis() { std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}"; while (_run == 1) { - json tmp; - std::unordered_map result; - if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); - } - - if (!result.empty()) { - for (auto element : result) { + try { + json tmp; + std::unordered_map result; + if (_rc->clusterMode) { + _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } else { + _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + } + + if (!result.empty()) { + for (auto element : result) { #ifdef ZT_TRACE - fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); + fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); #endif - for (auto rec : element.second) { - std::string id = rec.first; - auto attrs = rec.second; + for (auto rec : element.second) { + std::string id = rec.first; + auto attrs = rec.second; #ifdef ZT_TRACE - fprintf(stdout, "Record ID: %s\n", id.c_str()); - fprintf(stdout, "attrs len: %lu\n", attrs.size()); + fprintf(stdout, "Record ID: %s\n", id.c_str()); + fprintf(stdout, "attrs len: %lu\n", attrs.size()); #endif - for (auto a : attrs) { + for (auto a : attrs) { #ifdef ZT_TRACE - fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); + fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); #endif - try { - tmp = json::parse(a.second); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + try { + tmp = json::parse(a.second); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) { + fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } catch (...) { - fprintf(stderr, "json parse error in networkWatcher_Redis\n"); } - } - if (_rc->clusterMode) { - _cluster->xdel(key, id); - } else { - _redis->xdel(key, id); + if (_rc->clusterMode) { + _cluster->xdel(key, id); + } else { + _redis->xdel(key, id); + } } } } + } catch (sw::redis::Error &e) { + fprintf(stderr, "Error in Redis networks watcher: %s\n", e.what()); } } fprintf(stderr, "networksWatcher ended\n");