diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index f83ebc9b9..7aefec9f3 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -937,15 +937,16 @@ void PostgreSQL::_membersWatcher_Postgres() { void PostgreSQL::_membersWatcher_Redis() { char buf[11] = {0}; std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; + std::string lastID = "0"; fprintf(stderr, "Listening to member stream: %s\n", key.c_str()); while (_run == 1) { try { json tmp; std::unordered_map result; if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _cluster->xread(key, lastID, std::chrono::seconds(1), 10, std::inserter(result, result.end())); } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _redis->xread(key, lastID, std::chrono::seconds(1), 10, std::inserter(result, result.end())); } if (!result.empty()) { for (auto element : result) { @@ -982,6 +983,7 @@ void PostgreSQL::_membersWatcher_Redis() { } else { _redis->xdel(key, id); } + lastID = id; } } } @@ -1024,15 +1026,15 @@ void PostgreSQL::_networksWatcher_Postgres() { void PostgreSQL::_networksWatcher_Redis() { char buf[11] = {0}; std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}"; - + std::string lastID = "0"; while (_run == 1) { try { json tmp; std::unordered_map result; if (_rc->clusterMode) { - _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end())); } else { - _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); + _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end())); } if (!result.empty()) { @@ -1070,6 +1072,7 @@ void PostgreSQL::_networksWatcher_Redis() { } else { _redis->xdel(key, id); } + lastID = id; } } }