Merge branch 'adamierymenko-dev' into netcon

This commit is contained in:
Adam Ierymenko 2015-09-09 09:54:39 -07:00
commit a43c3fbf2e
16 changed files with 382 additions and 217 deletions

View file

@ -166,7 +166,7 @@ SqliteNetworkController::SqliteNetworkController(const char *dbPath) :
/* Node */ /* Node */
||(sqlite3_prepare_v2(_db,"SELECT identity FROM Node WHERE id = ?",-1,&_sGetNodeIdentity,(const char **)0) != SQLITE_OK) ||(sqlite3_prepare_v2(_db,"SELECT identity FROM Node WHERE id = ?",-1,&_sGetNodeIdentity,(const char **)0) != SQLITE_OK)
||(sqlite3_prepare_v2(_db,"INSERT INTO Node (id,identity) VALUES (?,?)",-1,&_sCreateNode,(const char **)0) != SQLITE_OK) ||(sqlite3_prepare_v2(_db,"INSERT OR REPLACE INTO Node (id,identity) VALUES (?,?)",-1,&_sCreateNode,(const char **)0) != SQLITE_OK)
/* Rule */ /* Rule */
||(sqlite3_prepare_v2(_db,"SELECT etherType FROM Rule WHERE networkId = ? AND \"action\" = 'accept'",-1,&_sGetEtherTypesFromRuleTable,(const char **)0) != SQLITE_OK) ||(sqlite3_prepare_v2(_db,"SELECT etherType FROM Rule WHERE networkId = ? AND \"action\" = 'accept'",-1,&_sGetEtherTypesFromRuleTable,(const char **)0) != SQLITE_OK)
@ -301,7 +301,7 @@ SqliteNetworkController::~SqliteNetworkController()
} }
} }
NetworkController::ResultCode SqliteNetworkController::doNetworkConfigRequest(const InetAddress &fromAddr,const Identity &signingId,const Identity &identity,uint64_t nwid,const Dictionary &metaData,uint64_t haveRevision,Dictionary &netconf) NetworkController::ResultCode SqliteNetworkController::doNetworkConfigRequest(const InetAddress &fromAddr,const Identity &signingId,const Identity &identity,uint64_t nwid,const Dictionary &metaData,Dictionary &netconf)
{ {
// Decode some stuff from metaData // Decode some stuff from metaData
const unsigned int clientMajorVersion = (unsigned int)metaData.getHexUInt(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0); const unsigned int clientMajorVersion = (unsigned int)metaData.getHexUInt(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0);
@ -870,6 +870,27 @@ unsigned int SqliteNetworkController::handleControlPlaneHttpPOST(
} }
addToNetworkRevision = 1; addToNetworkRevision = 1;
} }
} else if (!strcmp(j->u.object.values[k].name,"identity")) {
// Identity is technically an immutable field, but if the member's Node has
// no identity we allow it to be populated. This is primarily for migrating
// node data from another controller.
json_value *idstr = j->u.object.values[k].value;
if (idstr->type == json_string) {
sqlite3_reset(_sGetNodeIdentity);
sqlite3_bind_text(_sGetNodeIdentity,1,addrs,10,SQLITE_STATIC);
if ((sqlite3_step(_sGetNodeIdentity) == SQLITE_ROW)&&(!sqlite3_column_text(_sGetNodeIdentity,0))) {
try {
Identity id2(idstr->u.string.ptr);
if (id2) {
std::string idstr2(id2.toString(false)); // object must persist until after sqlite3_step() for SQLITE_STATIC
sqlite3_reset(_sCreateNode);
sqlite3_bind_text(_sCreateNode,1,addrs,10,SQLITE_STATIC);
sqlite3_bind_text(_sCreateNode,2,idstr2.c_str(),-1,SQLITE_STATIC);
sqlite3_step(_sCreateNode);
}
} catch ( ... ) {} // ignore invalid identities
}
}
} }
} }
@ -1355,8 +1376,44 @@ unsigned int SqliteNetworkController::_doCPGet(
sqlite3_bind_text(_sGetMember2,1,nwids,16,SQLITE_STATIC); sqlite3_bind_text(_sGetMember2,1,nwids,16,SQLITE_STATIC);
sqlite3_bind_text(_sGetMember2,2,addrs,10,SQLITE_STATIC); sqlite3_bind_text(_sGetMember2,2,addrs,10,SQLITE_STATIC);
if (sqlite3_step(_sGetMember2) == SQLITE_ROW) { if (sqlite3_step(_sGetMember2) == SQLITE_ROW) {
const char *memberIdStr = (const char *)sqlite3_column_text(_sGetMember2,3);
// If testSingingId is included in the URL or X-ZT1-TestSigningId in the headers
// and if it contains an identity with a secret portion, the resturned JSON
// will contain an extra field called _testConf. This will contain several
// fields that report the result of doNetworkConfigRequest() for this member.
std::string testFields;
{
Identity testOutputSigningId;
std::map<std::string,std::string>::const_iterator sid(urlArgs.find("testSigningId"));
if (sid != urlArgs.end()) {
testOutputSigningId.fromString(sid->second.c_str());
} else {
sid = headers.find("x-zt1-testsigningid");
if (sid != headers.end())
testOutputSigningId.fromString(sid->second.c_str());
}
if ((testOutputSigningId.hasPrivate())&&(memberIdStr)) {
Dictionary testNetconf;
NetworkController::ResultCode rc = this->doNetworkConfigRequest(
InetAddress(),
testOutputSigningId,
Identity(memberIdStr),
nwid,
Dictionary(), // TODO: allow passing of meta-data for testing
testNetconf);
char rcs[16];
Utils::snprintf(rcs,sizeof(rcs),"%d,\n",(int)rc);
testFields.append("\t\"_test\": {\n");
testFields.append("\t\t\"resultCode\": "); testFields.append(rcs);
testFields.append("\t\t\"result\": \""); testFields.append(_jsonEscape(testNetconf.toString().c_str()).c_str()); testFields.append("\"");
testFields.append("\t}\n");
}
}
Utils::snprintf(json,sizeof(json), Utils::snprintf(json,sizeof(json),
"{\n" "{\n%s"
"\t\"nwid\": \"%s\",\n" "\t\"nwid\": \"%s\",\n"
"\t\"address\": \"%s\",\n" "\t\"address\": \"%s\",\n"
"\t\"controllerInstanceId\": \"%s\",\n" "\t\"controllerInstanceId\": \"%s\",\n"
@ -1366,6 +1423,7 @@ unsigned int SqliteNetworkController::_doCPGet(
"\t\"clock\": %llu,\n" "\t\"clock\": %llu,\n"
"\t\"identity\": \"%s\",\n" "\t\"identity\": \"%s\",\n"
"\t\"ipAssignments\": [", "\t\"ipAssignments\": [",
testFields.c_str(),
nwids, nwids,
addrs, addrs,
_instanceId.c_str(), _instanceId.c_str(),
@ -1373,7 +1431,7 @@ unsigned int SqliteNetworkController::_doCPGet(
(sqlite3_column_int(_sGetMember2,1) > 0) ? "true" : "false", (sqlite3_column_int(_sGetMember2,1) > 0) ? "true" : "false",
(unsigned long long)sqlite3_column_int64(_sGetMember2,2), (unsigned long long)sqlite3_column_int64(_sGetMember2,2),
(unsigned long long)OSUtils::now(), (unsigned long long)OSUtils::now(),
_jsonEscape((const char *)sqlite3_column_text(_sGetMember2,3)).c_str()); _jsonEscape(memberIdStr).c_str());
responseBody = json; responseBody = json;
sqlite3_reset(_sGetIpAssignmentsForNode2); sqlite3_reset(_sGetIpAssignmentsForNode2);

View file

@ -54,7 +54,6 @@ public:
const Identity &identity, const Identity &identity,
uint64_t nwid, uint64_t nwid,
const Dictionary &metaData, const Dictionary &metaData,
uint64_t haveRevision,
Dictionary &netconf); Dictionary &netconf);
unsigned int handleControlPlaneHttpGET( unsigned int handleControlPlaneHttpGET(

View file

@ -166,6 +166,15 @@ public:
return _a; return _a;
} }
/**
* @return Hash code for use with Hashtable
*/
inline unsigned long hashCode() const
throw()
{
return (unsigned long)_a;
}
/** /**
* @return Hexadecimal string * @return Hexadecimal string
*/ */
@ -230,4 +239,3 @@ private:
} // namespace ZeroTier } // namespace ZeroTier
#endif #endif

View file

@ -30,6 +30,8 @@
#include <stdexcept> #include <stdexcept>
#include <vector> #include <vector>
#include <utility>
#include <algorithm>
namespace ZeroTier { namespace ZeroTier {
@ -37,8 +39,9 @@ namespace ZeroTier {
* A minimal hash table implementation for the ZeroTier core * A minimal hash table implementation for the ZeroTier core
* *
* This is not a drop-in replacement for STL containers, and has several * This is not a drop-in replacement for STL containers, and has several
* limitations. It's designed to be small and fast for use in the * limitations. Keys can be uint64_t or an object, and if the latter they
* ZeroTier core. * must implement a method called hashCode() that returns an unsigned long
* value that is evenly distributed.
*/ */
template<typename K,typename V> template<typename K,typename V>
class Hashtable class Hashtable
@ -181,10 +184,11 @@ public:
/** /**
* @return Vector of all keys * @return Vector of all keys
*/ */
inline typename std::vector<K> keys() inline typename std::vector<K> keys() const
{ {
typename std::vector<K> k; typename std::vector<K> k;
if (_s) { if (_s) {
k.reserve(_s);
for(unsigned long i=0;i<_bc;++i) { for(unsigned long i=0;i<_bc;++i) {
_Bucket *b = _t[i]; _Bucket *b = _t[i];
while (b) { while (b) {
@ -196,6 +200,45 @@ public:
return k; return k;
} }
/**
* Append all keys (in unspecified order) to the supplied vector or list
*
* @param v Vector, list, or other compliant container
* @tparam Type of V (generally inferred)
*/
template<typename C>
inline void appendKeys(C &v) const
{
if (_s) {
for(unsigned long i=0;i<_bc;++i) {
_Bucket *b = _t[i];
while (b) {
v.push_back(b->k);
b = b->next;
}
}
}
}
/**
* @return Vector of all entries (pairs of K,V)
*/
inline typename std::vector< std::pair<K,V> > entries() const
{
typename std::vector< std::pair<K,V> > k;
if (_s) {
k.reserve(_s);
for(unsigned long i=0;i<_bc;++i) {
_Bucket *b = _t[i];
while (b) {
k.push_back(std::pair<K,V>(b->k,b->v));
b = b->next;
}
}
}
return k;
}
/** /**
* @param k Key * @param k Key
* @return Pointer to value or NULL if not found * @return Pointer to value or NULL if not found
@ -212,6 +255,21 @@ public:
} }
inline const V *get(const K &k) const { return const_cast<Hashtable *>(this)->get(k); } inline const V *get(const K &k) const { return const_cast<Hashtable *>(this)->get(k); }
/**
* @param k Key to check
* @return True if key is present
*/
inline bool contains(const K &k) const
{
_Bucket *b = _t[_hc(k) % _bc];
while (b) {
if (b->k == k)
return true;
b = b->next;
}
return false;
}
/** /**
* @param k Key * @param k Key
* @return True if value was present * @return True if value was present
@ -315,9 +373,12 @@ private:
} }
static inline unsigned long _hc(const uint64_t i) static inline unsigned long _hc(const uint64_t i)
{ {
// NOTE: this is fine for network IDs, but might be bad for other kinds /* NOTE: this assumes that 'i' is evenly distributed, which is the case for
// of IDs if they are not evenly or randomly distributed. * packet IDs and network IDs -- the two use cases in ZT for uint64_t keys.
return (unsigned long)((i ^ (i >> 32)) * 2654435761ULL); * These values are also greater than 0xffffffff so they'll map onto a full
* bucket count just fine no matter what happens. Normally you'd want to
* hash an integer key index in a hash table. */
return (unsigned long)i;
} }
inline void _grow() inline void _grow()

View file

@ -662,7 +662,7 @@ bool IncomingPacket::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *RR,cons
const uint64_t nwid = at<uint64_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_NETWORK_ID); const uint64_t nwid = at<uint64_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_NETWORK_ID);
const unsigned int metaDataLength = at<uint16_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT_LEN); const unsigned int metaDataLength = at<uint16_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT_LEN);
const Dictionary metaData((const char *)field(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT,metaDataLength),metaDataLength); const Dictionary metaData((const char *)field(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT,metaDataLength),metaDataLength);
const uint64_t haveRevision = ((ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT + metaDataLength + 8) <= size()) ? at<uint64_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT + metaDataLength) : 0ULL; //const uint64_t haveRevision = ((ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT + metaDataLength + 8) <= size()) ? at<uint64_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT + metaDataLength) : 0ULL;
const unsigned int h = hops(); const unsigned int h = hops();
const uint64_t pid = packetId(); const uint64_t pid = packetId();
@ -670,7 +670,7 @@ bool IncomingPacket::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *RR,cons
if (RR->localNetworkController) { if (RR->localNetworkController) {
Dictionary netconf; Dictionary netconf;
switch(RR->localNetworkController->doNetworkConfigRequest((h > 0) ? InetAddress() : _remoteAddress,RR->identity,peer->identity(),nwid,metaData,haveRevision,netconf)) { switch(RR->localNetworkController->doNetworkConfigRequest((h > 0) ? InetAddress() : _remoteAddress,RR->identity,peer->identity(),nwid,metaData,netconf)) {
case NetworkController::NETCONF_QUERY_OK: { case NetworkController::NETCONF_QUERY_OK: {
const std::string netconfStr(netconf.toString()); const std::string netconfStr(netconf.toString());

View file

@ -92,7 +92,7 @@ Network::Network(const RuntimeEnvironment *renv,uint64_t nwid) :
com.deserialize2(p,e); com.deserialize2(p,e);
if (!com) if (!com)
break; break;
_membershipCertificates.insert(std::pair< Address,CertificateOfMembership >(com.issuedTo(),com)); _certInfo[com.issuedTo()].com = com;
} }
} }
} }
@ -125,21 +125,24 @@ Network::~Network()
clean(); clean();
std::string buf("ZTMCD0");
Utils::snprintf(n,sizeof(n),"networks.d/%.16llx.mcerts",_id); Utils::snprintf(n,sizeof(n),"networks.d/%.16llx.mcerts",_id);
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
if ((!_config)||(_config->isPublic())||(_certInfo.empty())) {
if ((!_config)||(_config->isPublic())||(_membershipCertificates.size() == 0)) {
RR->node->dataStoreDelete(n); RR->node->dataStoreDelete(n);
return; } else {
std::string buf("ZTMCD0");
Hashtable< Address,_RemoteMemberCertificateInfo >::Iterator i(_certInfo);
Address *a = (Address *)0;
_RemoteMemberCertificateInfo *ci = (_RemoteMemberCertificateInfo *)0;
while (i.next(a,ci)) {
if (ci->com)
ci->com.serialize2(buf);
} }
for(std::map<Address,CertificateOfMembership>::iterator c(_membershipCertificates.begin());c!=_membershipCertificates.end();++c)
c->second.serialize2(buf);
RR->node->dataStorePut(n,buf,true); RR->node->dataStorePut(n,buf,true);
} }
} }
}
bool Network::subscribedToMulticastGroup(const MulticastGroup &mg,bool includeBridgedGroups) const bool Network::subscribedToMulticastGroup(const MulticastGroup &mg,bool includeBridgedGroups) const
{ {
@ -147,7 +150,7 @@ bool Network::subscribedToMulticastGroup(const MulticastGroup &mg,bool includeBr
if (std::binary_search(_myMulticastGroups.begin(),_myMulticastGroups.end(),mg)) if (std::binary_search(_myMulticastGroups.begin(),_myMulticastGroups.end(),mg))
return true; return true;
else if (includeBridgedGroups) else if (includeBridgedGroups)
return (_multicastGroupsBehindMe.find(mg) != _multicastGroupsBehindMe.end()); return _multicastGroupsBehindMe.contains(mg);
else return false; else return false;
} }
@ -237,7 +240,7 @@ void Network::requestConfiguration()
if (RR->localNetworkController) { if (RR->localNetworkController) {
SharedPtr<NetworkConfig> nconf(config2()); SharedPtr<NetworkConfig> nconf(config2());
Dictionary newconf; Dictionary newconf;
switch(RR->localNetworkController->doNetworkConfigRequest(InetAddress(),RR->identity,RR->identity,_id,Dictionary(),(nconf) ? nconf->revision() : (uint64_t)0,newconf)) { switch(RR->localNetworkController->doNetworkConfigRequest(InetAddress(),RR->identity,RR->identity,_id,Dictionary(),newconf)) {
case NetworkController::NETCONF_QUERY_OK: case NetworkController::NETCONF_QUERY_OK:
this->setConfiguration(newconf,true); this->setConfiguration(newconf,true);
return; return;
@ -284,11 +287,12 @@ bool Network::validateAndAddMembershipCertificate(const CertificateOfMembership
return false; return false;
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
CertificateOfMembership &old = _membershipCertificates[cert.issuedTo()];
// Nothing to do if the cert hasn't changed -- we get duplicates due to zealous cert pushing {
if (old == cert) const _RemoteMemberCertificateInfo *ci = _certInfo.get(cert.issuedTo());
return true; // but if it's a duplicate of one we already accepted, return is 'true' if ((ci)&&(ci->com == cert))
return true; // we already have it
}
// Check signature, log and return if cert is invalid // Check signature, log and return if cert is invalid
if (cert.signedBy() != controller()) { if (cert.signedBy() != controller()) {
@ -322,9 +326,8 @@ bool Network::validateAndAddMembershipCertificate(const CertificateOfMembership
} }
} }
// If we made it past authentication, update cert // If we made it past authentication, add or update cert in our cert info store
if (cert.revision() != old.revision()) _certInfo[cert.issuedTo()].com = cert;
old = cert;
return true; return true;
} }
@ -333,9 +336,9 @@ bool Network::peerNeedsOurMembershipCertificate(const Address &to,uint64_t now)
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
if ((_config)&&(!_config->isPublic())&&(_config->com())) { if ((_config)&&(!_config->isPublic())&&(_config->com())) {
uint64_t &lastPushed = _lastPushedMembershipCertificate[to]; _RemoteMemberCertificateInfo &ci = _certInfo[to];
if ((now - lastPushed) > (ZT_NETWORK_AUTOCONF_DELAY / 2)) { if ((now - ci.lastPushed) > (ZT_NETWORK_AUTOCONF_DELAY / 2)) {
lastPushed = now; ci.lastPushed = now;
return true; return true;
} }
} }
@ -352,31 +355,28 @@ void Network::clean()
if ((_config)&&(_config->isPublic())) { if ((_config)&&(_config->isPublic())) {
// Open (public) networks do not track certs or cert pushes at all. // Open (public) networks do not track certs or cert pushes at all.
_membershipCertificates.clear(); _certInfo.clear();
_lastPushedMembershipCertificate.clear();
} else if (_config) { } else if (_config) {
// Clean certificates that are no longer valid from the cache. // Clean obsolete entries from private network cert info table
for(std::map<Address,CertificateOfMembership>::iterator c=(_membershipCertificates.begin());c!=_membershipCertificates.end();) { Hashtable< Address,_RemoteMemberCertificateInfo >::Iterator i(_certInfo);
if (_config->com().agreesWith(c->second)) Address *a = (Address *)0;
++c; _RemoteMemberCertificateInfo *ci = (_RemoteMemberCertificateInfo *)0;
else _membershipCertificates.erase(c++); const uint64_t forgetIfBefore = now - (ZT_PEER_ACTIVITY_TIMEOUT * 16); // arbitrary reasonable cutoff
} while (i.next(a,ci)) {
if ((ci->lastPushed < forgetIfBefore)&&(!ci->com.agreesWith(_config->com())))
// Clean entries from the last pushed tracking map if they're so old as _certInfo.erase(*a);
// to be no longer relevant.
uint64_t forgetIfBefore = now - (ZT_PEER_ACTIVITY_TIMEOUT * 16); // arbitrary reasonable cutoff
for(std::map<Address,uint64_t>::iterator lp(_lastPushedMembershipCertificate.begin());lp!=_lastPushedMembershipCertificate.end();) {
if (lp->second < forgetIfBefore)
_lastPushedMembershipCertificate.erase(lp++);
else ++lp;
} }
} }
// Clean learned multicast groups if we haven't heard from them in a while // Clean learned multicast groups if we haven't heard from them in a while
for(std::map<MulticastGroup,uint64_t>::iterator mg(_multicastGroupsBehindMe.begin());mg!=_multicastGroupsBehindMe.end();) { {
if ((now - mg->second) > (ZT_MULTICAST_LIKE_EXPIRE * 2)) Hashtable< MulticastGroup,uint64_t >::Iterator i(_multicastGroupsBehindMe);
_multicastGroupsBehindMe.erase(mg++); MulticastGroup *mg = (MulticastGroup *)0;
else ++mg; uint64_t *ts = (uint64_t *)0;
while (i.next(mg,ts)) {
if ((now - *ts) > (ZT_MULTICAST_LIKE_EXPIRE * 2))
_multicastGroupsBehindMe.erase(*mg);
}
} }
} }
@ -385,22 +385,34 @@ void Network::learnBridgeRoute(const MAC &mac,const Address &addr)
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
_remoteBridgeRoutes[mac] = addr; _remoteBridgeRoutes[mac] = addr;
// If _remoteBridgeRoutes exceeds sanity limit, trim worst offenders until below -- denial of service circuit breaker // Anti-DOS circuit breaker to prevent nodes from spamming us with absurd numbers of bridge routes
while (_remoteBridgeRoutes.size() > ZT_MAX_BRIDGE_ROUTES) { while (_remoteBridgeRoutes.size() > ZT_MAX_BRIDGE_ROUTES) {
std::map<Address,unsigned long> counts; Hashtable< Address,unsigned long > counts;
Address maxAddr; Address maxAddr;
unsigned long maxCount = 0; unsigned long maxCount = 0;
for(std::map<MAC,Address>::iterator br(_remoteBridgeRoutes.begin());br!=_remoteBridgeRoutes.end();++br) {
unsigned long c = ++counts[br->second]; MAC *m = (MAC *)0;
Address *a = (Address *)0;
// Find the address responsible for the most entries
{
Hashtable<MAC,Address>::Iterator i(_remoteBridgeRoutes);
while (i.next(m,a)) {
const unsigned long c = ++counts[*a];
if (c > maxCount) { if (c > maxCount) {
maxCount = c; maxCount = c;
maxAddr = br->second; maxAddr = *a;
} }
} }
for(std::map<MAC,Address>::iterator br(_remoteBridgeRoutes.begin());br!=_remoteBridgeRoutes.end();) { }
if (br->second == maxAddr)
_remoteBridgeRoutes.erase(br++); // Kill this address from our table, since it's most likely spamming us
else ++br; {
Hashtable<MAC,Address>::Iterator i(_remoteBridgeRoutes);
while (i.next(m,a)) {
if (*a == maxAddr)
_remoteBridgeRoutes.erase(*m);
}
} }
} }
} }
@ -408,8 +420,8 @@ void Network::learnBridgeRoute(const MAC &mac,const Address &addr)
void Network::learnBridgedMulticastGroup(const MulticastGroup &mg,uint64_t now) void Network::learnBridgedMulticastGroup(const MulticastGroup &mg,uint64_t now)
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
unsigned long tmp = (unsigned long)_multicastGroupsBehindMe.size(); const unsigned long tmp = (unsigned long)_multicastGroupsBehindMe.size();
_multicastGroupsBehindMe[mg] = now; _multicastGroupsBehindMe.set(mg,now);
if (tmp != _multicastGroupsBehindMe.size()) if (tmp != _multicastGroupsBehindMe.size())
_announceMulticastGroups(); _announceMulticastGroups();
} }
@ -490,12 +502,10 @@ bool Network::_isAllowed(const Address &peer) const
return false; return false;
if (_config->isPublic()) if (_config->isPublic())
return true; return true;
const _RemoteMemberCertificateInfo *ci = _certInfo.get(peer);
std::map<Address,CertificateOfMembership>::const_iterator pc(_membershipCertificates.find(peer)); if (!ci)
if (pc == _membershipCertificates.end()) return false;
return false; // no certificate on file return _config->com().agreesWith(ci->com);
return _config->com().agreesWith(pc->second); // is other cert valid against ours?
} catch (std::exception &exc) { } catch (std::exception &exc) {
TRACE("isAllowed() check failed for peer %s: unexpected exception: %s",peer.toString().c_str(),exc.what()); TRACE("isAllowed() check failed for peer %s: unexpected exception: %s",peer.toString().c_str(),exc.what());
} catch ( ... ) { } catch ( ... ) {
@ -510,8 +520,7 @@ std::vector<MulticastGroup> Network::_allMulticastGroups() const
std::vector<MulticastGroup> mgs; std::vector<MulticastGroup> mgs;
mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1); mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1);
mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end()); mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end());
for(std::map< MulticastGroup,uint64_t >::const_iterator i(_multicastGroupsBehindMe.begin());i!=_multicastGroupsBehindMe.end();++i) _multicastGroupsBehindMe.appendKeys(mgs);
mgs.push_back(i->first);
if ((_config)&&(_config->enableBroadcast())) if ((_config)&&(_config->enableBroadcast()))
mgs.push_back(Network::BROADCAST); mgs.push_back(Network::BROADCAST);
std::sort(mgs.begin(),mgs.end()); std::sort(mgs.begin(),mgs.end());

View file

@ -40,6 +40,7 @@
#include "Constants.hpp" #include "Constants.hpp"
#include "NonCopyable.hpp" #include "NonCopyable.hpp"
#include "Hashtable.hpp"
#include "Address.hpp" #include "Address.hpp"
#include "Mutex.hpp" #include "Mutex.hpp"
#include "SharedPtr.hpp" #include "SharedPtr.hpp"
@ -297,10 +298,10 @@ public:
inline Address findBridgeTo(const MAC &mac) const inline Address findBridgeTo(const MAC &mac) const
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
std::map<MAC,Address>::const_iterator br(_remoteBridgeRoutes.find(mac)); const Address *const br = _remoteBridgeRoutes.get(mac);
if (br == _remoteBridgeRoutes.end()) if (br)
return *br;
return Address(); return Address();
return br->second;
} }
/** /**
@ -346,6 +347,13 @@ public:
inline bool operator>=(const Network &n) const throw() { return (_id >= n._id); } inline bool operator>=(const Network &n) const throw() { return (_id >= n._id); }
private: private:
struct _RemoteMemberCertificateInfo
{
_RemoteMemberCertificateInfo() : com(),lastPushed(0) {}
CertificateOfMembership com; // remote member's COM
uint64_t lastPushed; // when did we last push ours to them?
};
ZT1_VirtualNetworkStatus _status() const; ZT1_VirtualNetworkStatus _status() const;
void _externalConfig(ZT1_VirtualNetworkConfig *ec) const; // assumes _lock is locked void _externalConfig(ZT1_VirtualNetworkConfig *ec) const; // assumes _lock is locked
bool _isAllowed(const Address &peer) const; bool _isAllowed(const Address &peer) const;
@ -358,13 +366,11 @@ private:
volatile bool _enabled; volatile bool _enabled;
volatile bool _portInitialized; volatile bool _portInitialized;
std::vector< MulticastGroup > _myMulticastGroups; // multicast groups that we belong to including those behind us (updated periodically) std::vector< MulticastGroup > _myMulticastGroups; // multicast groups that we belong to (according to tap)
std::map< MulticastGroup,uint64_t > _multicastGroupsBehindMe; // multicast groups bridged to us and when we last saw activity on each Hashtable< MulticastGroup,uint64_t > _multicastGroupsBehindMe; // multicast groups that seem to be behind us and when we last saw them (if we are a bridge)
Hashtable< MAC,Address > _remoteBridgeRoutes; // remote addresses where given MACs are reachable (for tracking devices behind remote bridges)
std::map<MAC,Address> _remoteBridgeRoutes; // remote addresses where given MACs are reachable Hashtable< Address,_RemoteMemberCertificateInfo > _certInfo;
std::map<Address,CertificateOfMembership> _membershipCertificates; // Other members' certificates of membership
std::map<Address,uint64_t> _lastPushedMembershipCertificate; // When did we last push our certificate to each remote member?
SharedPtr<NetworkConfig> _config; // Most recent network configuration, which is an immutable value-object SharedPtr<NetworkConfig> _config; // Most recent network configuration, which is an immutable value-object
volatile uint64_t _lastConfigUpdate; volatile uint64_t _lastConfigUpdate;

View file

@ -75,7 +75,6 @@ public:
* @param identity Originating peer ZeroTier identity * @param identity Originating peer ZeroTier identity
* @param nwid 64-bit network ID * @param nwid 64-bit network ID
* @param metaData Meta-data bundled with request (empty if none) * @param metaData Meta-data bundled with request (empty if none)
* @param haveRevision Network revision ID sent by requesting peer or 0 if none
* @param result Dictionary to receive resulting signed netconf on success * @param result Dictionary to receive resulting signed netconf on success
* @return Returns NETCONF_QUERY_OK if result dictionary is valid, or an error code on error * @return Returns NETCONF_QUERY_OK if result dictionary is valid, or an error code on error
*/ */
@ -85,7 +84,6 @@ public:
const Identity &identity, const Identity &identity,
uint64_t nwid, uint64_t nwid,
const Dictionary &metaData, const Dictionary &metaData,
uint64_t haveRevision,
Dictionary &result) = 0; Dictionary &result) = 0;
}; };

View file

@ -355,7 +355,8 @@ void Node::status(ZT1_NodeStatus *status) const
ZT1_PeerList *Node::peers() const ZT1_PeerList *Node::peers() const
{ {
std::map< Address,SharedPtr<Peer> > peers(RR->topology->allPeers()); std::vector< std::pair< Address,SharedPtr<Peer> > > peers(RR->topology->allPeers());
std::sort(peers.begin(),peers.end());
char *buf = (char *)::malloc(sizeof(ZT1_PeerList) + (sizeof(ZT1_Peer) * peers.size())); char *buf = (char *)::malloc(sizeof(ZT1_PeerList) + (sizeof(ZT1_Peer) * peers.size()));
if (!buf) if (!buf)
@ -364,7 +365,7 @@ ZT1_PeerList *Node::peers() const
pl->peers = (ZT1_Peer *)(buf + sizeof(ZT1_PeerList)); pl->peers = (ZT1_Peer *)(buf + sizeof(ZT1_PeerList));
pl->peerCount = 0; pl->peerCount = 0;
for(std::map< Address,SharedPtr<Peer> >::iterator pi(peers.begin());pi!=peers.end();++pi) { for(std::vector< std::pair< Address,SharedPtr<Peer> > >::iterator pi(peers.begin());pi!=peers.end();++pi) {
ZT1_Peer *p = &(pl->peers[pl->peerCount++]); ZT1_Peer *p = &(pl->peers[pl->peerCount++]);
p->address = pi->second->address().toInt(); p->address = pi->second->address().toInt();
p->lastUnicastFrame = pi->second->lastUnicastFrame(); p->lastUnicastFrame = pi->second->lastUnicastFrame();

View file

@ -40,6 +40,7 @@
#include "../include/ZeroTierOne.h" #include "../include/ZeroTierOne.h"
#include "RuntimeEnvironment.hpp" #include "RuntimeEnvironment.hpp"
#include "CertificateOfMembership.hpp"
#include "RemotePath.hpp" #include "RemotePath.hpp"
#include "Address.hpp" #include "Address.hpp"
#include "Utils.hpp" #include "Utils.hpp"

View file

@ -107,10 +107,14 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
// Erase all entries (other than this one) for this scope to prevent thrashing // Erase all entries (other than this one) for this scope to prevent thrashing
// Note: we should probably not use 'entry' after this // Note: we should probably not use 'entry' after this
for(std::map< PhySurfaceKey,PhySurfaceEntry >::iterator p(_phy.begin());p!=_phy.end();) { {
if ((p->first.reporter != reporter)&&(p->first.scope == scope)) Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
_phy.erase(p++); PhySurfaceKey *k = (PhySurfaceKey *)0;
else ++p; PhySurfaceEntry *e = (PhySurfaceEntry *)0;
while (i.next(k,e)) {
if ((k->reporter != reporter)&&(k->scope == scope))
_phy.erase(*k);
}
} }
_ResetWithinScope rset(RR,now,(InetAddress::IpScope)scope); _ResetWithinScope rset(RR,now,(InetAddress::IpScope)scope);
@ -140,26 +144,13 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
void SelfAwareness::clean(uint64_t now) void SelfAwareness::clean(uint64_t now)
{ {
Mutex::Lock _l(_phy_m); Mutex::Lock _l(_phy_m);
for(std::map< PhySurfaceKey,PhySurfaceEntry >::iterator p(_phy.begin());p!=_phy.end();) { Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
if ((now - p->second.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) PhySurfaceKey *k = (PhySurfaceKey *)0;
_phy.erase(p++); PhySurfaceEntry *e = (PhySurfaceEntry *)0;
else ++p; while (i.next(k,e)) {
if ((now - e->ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT)
_phy.erase(*k);
} }
} }
bool SelfAwareness::areGlobalIPv4PortsRandomized() const
{
int port = 0;
Mutex::Lock _l(_phy_m);
for(std::map< PhySurfaceKey,PhySurfaceEntry >::const_iterator p(_phy.begin());p!=_phy.end();++p) {
if ((p->first.scope == InetAddress::IP_SCOPE_GLOBAL)&&(p->second.mySurface.ss_family == AF_INET)) {
const int tmp = (int)p->second.mySurface.port();
if ((port)&&(tmp != port))
return true;
else port = tmp;
}
}
return false;
}
} // namespace ZeroTier } // namespace ZeroTier

View file

@ -28,10 +28,9 @@
#ifndef ZT_SELFAWARENESS_HPP #ifndef ZT_SELFAWARENESS_HPP
#define ZT_SELFAWARENESS_HPP #define ZT_SELFAWARENESS_HPP
#include <map> #include "Constants.hpp"
#include <vector>
#include "InetAddress.hpp" #include "InetAddress.hpp"
#include "Hashtable.hpp"
#include "Address.hpp" #include "Address.hpp"
#include "Mutex.hpp" #include "Mutex.hpp"
@ -66,17 +65,14 @@ public:
*/ */
void clean(uint64_t now); void clean(uint64_t now);
/**
* @return True if our external (global scope) IPv4 ports appear to be randomized by a NAT device
*/
bool areGlobalIPv4PortsRandomized() const;
private: private:
struct PhySurfaceKey struct PhySurfaceKey
{ {
Address reporter; Address reporter;
InetAddress::IpScope scope; InetAddress::IpScope scope;
inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {} PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {}
PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {} PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {}
inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); } inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); }
@ -93,7 +89,7 @@ private:
const RuntimeEnvironment *RR; const RuntimeEnvironment *RR;
std::map< PhySurfaceKey,PhySurfaceEntry > _phy; Hashtable< PhySurfaceKey,PhySurfaceEntry > _phy;
Mutex _phy_m; Mutex _phy_m;
}; };

View file

@ -67,7 +67,10 @@ static const char *etherTypeName(const unsigned int etherType)
Switch::Switch(const RuntimeEnvironment *renv) : Switch::Switch(const RuntimeEnvironment *renv) :
RR(renv), RR(renv),
_lastBeaconResponse(0) _lastBeaconResponse(0),
_outstandingWhoisRequests(32),
_defragQueue(32),
_lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
{ {
} }
@ -291,7 +294,7 @@ void Switch::send(const Packet &packet,bool encrypt,uint64_t nwid)
if (!_trySend(packet,encrypt,nwid)) { if (!_trySend(packet,encrypt,nwid)) {
Mutex::Lock _l(_txQueue_m); Mutex::Lock _l(_txQueue_m);
_txQueue.insert(std::pair< Address,TXQueueEntry >(packet.destination(),TXQueueEntry(RR->node->now(),packet,encrypt,nwid))); _txQueue.push_back(TXQueueEntry(packet.destination(),RR->node->now(),packet,encrypt,nwid));
} }
} }
@ -309,31 +312,18 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force)
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now));
if (!(cg.first))
return false;
if (cg.first.ipScope() != cg.second.ipScope())
return false;
// Addresses are sorted in key for last unite attempt map for order
// invariant lookup: (p1,p2) == (p2,p1)
Array<Address,2> uniteKey;
if (p1 >= p2) {
uniteKey[0] = p2;
uniteKey[1] = p1;
} else {
uniteKey[0] = p1;
uniteKey[1] = p2;
}
{ {
Mutex::Lock _l(_lastUniteAttempt_m); Mutex::Lock _l(_lastUniteAttempt_m);
std::map< Array< Address,2 >,uint64_t >::const_iterator e(_lastUniteAttempt.find(uniteKey)); uint64_t &luts = _lastUniteAttempt[_LastUniteKey(p1,p2)];
if ((!force)&&(e != _lastUniteAttempt.end())&&((now - e->second) < ZT_MIN_UNITE_INTERVAL)) if (((now - luts) < ZT_MIN_UNITE_INTERVAL)&&(!force))
return false; return false;
else _lastUniteAttempt[uniteKey] = now; luts = now;
} }
std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now));
if ((!(cg.first))||(cg.first.ipScope() != cg.second.ipScope()))
return false;
TRACE("unite: %s(%s) <> %s(%s)",p1.toString().c_str(),cg.second.toString().c_str(),p2.toString().c_str(),cg.first.toString().c_str()); TRACE("unite: %s(%s) <> %s(%s)",p1.toString().c_str(),cg.second.toString().c_str(),p2.toString().c_str(),cg.first.toString().c_str());
/* Tell P1 where to find P2 and vice versa, sending the packets to P1 and /* Tell P1 where to find P2 and vice versa, sending the packets to P1 and
@ -402,10 +392,13 @@ void Switch::requestWhois(const Address &addr)
bool inserted = false; bool inserted = false;
{ {
Mutex::Lock _l(_outstandingWhoisRequests_m); Mutex::Lock _l(_outstandingWhoisRequests_m);
std::pair< std::map< Address,WhoisRequest >::iterator,bool > entry(_outstandingWhoisRequests.insert(std::pair<Address,WhoisRequest>(addr,WhoisRequest()))); WhoisRequest &r = _outstandingWhoisRequests[addr];
if ((inserted = entry.second)) if (r.lastSent) {
entry.first->second.lastSent = RR->node->now(); r.retries = 0; // reset retry count if entry already existed, but keep waiting and retry again after normal timeout
entry.first->second.retries = 0; // reset retry count if entry already existed } else {
r.lastSent = RR->node->now();
inserted = true;
}
} }
if (inserted) if (inserted)
_sendWhoisRequest(addr,(const Address *)0,0); _sendWhoisRequest(addr,(const Address *)0,0);
@ -435,11 +428,12 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
{ // finish sending any packets waiting on peer's public key / identity { // finish sending any packets waiting on peer's public key / identity
Mutex::Lock _l(_txQueue_m); Mutex::Lock _l(_txQueue_m);
std::pair< std::multimap< Address,TXQueueEntry >::iterator,std::multimap< Address,TXQueueEntry >::iterator > waitingTxQueueItems(_txQueue.equal_range(peer->address())); for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
for(std::multimap< Address,TXQueueEntry >::iterator txi(waitingTxQueueItems.first);txi!=waitingTxQueueItems.second;) { if (txi->dest == peer->address()) {
if (_trySend(txi->second.packet,txi->second.encrypt,txi->second.nwid)) if (_trySend(txi->packet,txi->encrypt,txi->nwid))
_txQueue.erase(txi++); _txQueue.erase(txi++);
else ++txi; else ++txi;
} else ++txi;
} }
} }
} }
@ -486,36 +480,37 @@ unsigned long Switch::doTimerTasks(uint64_t now)
{ // Retry outstanding WHOIS requests { // Retry outstanding WHOIS requests
Mutex::Lock _l(_outstandingWhoisRequests_m); Mutex::Lock _l(_outstandingWhoisRequests_m);
for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) { Hashtable< Address,WhoisRequest >::Iterator i(_outstandingWhoisRequests);
unsigned long since = (unsigned long)(now - i->second.lastSent); Address *a = (Address *)0;
WhoisRequest *r = (WhoisRequest *)0;
while (i.next(a,r)) {
const unsigned long since = (unsigned long)(now - r->lastSent);
if (since >= ZT_WHOIS_RETRY_DELAY) { if (since >= ZT_WHOIS_RETRY_DELAY) {
if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) { if (r->retries >= ZT_MAX_WHOIS_RETRIES) {
TRACE("WHOIS %s timed out",i->first.toString().c_str()); TRACE("WHOIS %s timed out",a->toString().c_str());
_outstandingWhoisRequests.erase(i++); _outstandingWhoisRequests.erase(*a);
continue;
} else { } else {
i->second.lastSent = now; r->lastSent = now;
i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries); r->peersConsulted[r->retries] = _sendWhoisRequest(*a,r->peersConsulted,r->retries);
++i->second.retries; ++r->retries;
TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries); TRACE("WHOIS %s (retry %u)",a->toString().c_str(),r->retries);
nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY); nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
} }
} else { } else {
nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since); nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
} }
++i;
} }
} }
{ // Time out TX queue packets that never got WHOIS lookups or other info. { // Time out TX queue packets that never got WHOIS lookups or other info.
Mutex::Lock _l(_txQueue_m); Mutex::Lock _l(_txQueue_m);
for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) { for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (_trySend(i->second.packet,i->second.encrypt,i->second.nwid)) if (_trySend(txi->packet,txi->encrypt,txi->nwid))
_txQueue.erase(i++); _txQueue.erase(txi++);
else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str()); TRACE("TX %s -> %s timed out",txi->packet.source().toString().c_str(),txi->packet.destination().toString().c_str());
_txQueue.erase(i++); _txQueue.erase(txi++);
} else ++i; } else ++txi;
} }
} }
@ -531,11 +526,25 @@ unsigned long Switch::doTimerTasks(uint64_t now)
{ // Time out packets that didn't get all their fragments. { // Time out packets that didn't get all their fragments.
Mutex::Lock _l(_defragQueue_m); Mutex::Lock _l(_defragQueue_m);
for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) { Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue);
if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) { uint64_t *packetId = (uint64_t *)0;
TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first); DefragQueueEntry *qe = (DefragQueueEntry *)0;
_defragQueue.erase(i++); while (i.next(packetId,qe)) {
} else ++i; if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",*packetId);
_defragQueue.erase(*packetId);
}
}
}
{ // Remove really old last unite attempt entries to keep table size controlled
Mutex::Lock _l(_lastUniteAttempt_m);
Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
_LastUniteKey *k = (_LastUniteKey *)0;
uint64_t *v = (uint64_t *)0;
while (i.next(k,v)) {
if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 16))
_lastUniteAttempt.erase(*k);
} }
} }
@ -577,32 +586,31 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void
// seeing a Packet::Fragment? // seeing a Packet::Fragment?
Mutex::Lock _l(_defragQueue_m); Mutex::Lock _l(_defragQueue_m);
std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid)); DefragQueueEntry &dq = _defragQueue[pid];
if (dqe == _defragQueue.end()) { if (!dq.creationTime) {
// We received a Packet::Fragment without its head, so queue it and wait // We received a Packet::Fragment without its head, so queue it and wait
DefragQueueEntry &dq = _defragQueue[pid];
dq.creationTime = RR->node->now(); dq.creationTime = RR->node->now();
dq.frags[fno - 1] = fragment; dq.frags[fno - 1] = fragment;
dq.totalFragments = tf; // total fragment count is known dq.totalFragments = tf; // total fragment count is known
dq.haveFragments = 1 << fno; // we have only this fragment dq.haveFragments = 1 << fno; // we have only this fragment
//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
} else if (!(dqe->second.haveFragments & (1 << fno))) { } else if (!(dq.haveFragments & (1 << fno))) {
// We have other fragments and maybe the head, so add this one and check // We have other fragments and maybe the head, so add this one and check
dqe->second.frags[fno - 1] = fragment; dq.frags[fno - 1] = fragment;
dqe->second.totalFragments = tf; dq.totalFragments = tf;
//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str()); //TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
if (Utils::countBits(dqe->second.haveFragments |= (1 << fno)) == tf) { if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) {
// We have all fragments -- assemble and process full Packet // We have all fragments -- assemble and process full Packet
//TRACE("packet %.16llx is complete, assembling and processing...",pid); //TRACE("packet %.16llx is complete, assembling and processing...",pid);
SharedPtr<IncomingPacket> packet(dqe->second.frag0); SharedPtr<IncomingPacket> packet(dq.frag0);
for(unsigned int f=1;f<tf;++f) for(unsigned int f=1;f<tf;++f)
packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength()); packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
_defragQueue.erase(dqe); _defragQueue.erase(pid); // dq no longer valid after this
if (!packet->tryDecode(RR)) { if (!packet->tryDecode(RR)) {
Mutex::Lock _l(_rxQueue_m); Mutex::Lock _l(_rxQueue_m);
@ -645,26 +653,27 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat
uint64_t pid = packet->packetId(); uint64_t pid = packet->packetId();
Mutex::Lock _l(_defragQueue_m); Mutex::Lock _l(_defragQueue_m);
std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid));
if (dqe == _defragQueue.end()) {
// If we have no other fragments yet, create an entry and save the head
DefragQueueEntry &dq = _defragQueue[pid]; DefragQueueEntry &dq = _defragQueue[pid];
if (!dq.creationTime) {
// If we have no other fragments yet, create an entry and save the head
dq.creationTime = RR->node->now(); dq.creationTime = RR->node->now();
dq.frag0 = packet; dq.frag0 = packet;
dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment
dq.haveFragments = 1; // head is first bit (left to right) dq.haveFragments = 1; // head is first bit (left to right)
//TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str()); //TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str());
} else if (!(dqe->second.haveFragments & 1)) { } else if (!(dq.haveFragments & 1)) {
// If we have other fragments but no head, see if we are complete with the head // If we have other fragments but no head, see if we are complete with the head
if ((dqe->second.totalFragments)&&(Utils::countBits(dqe->second.haveFragments |= 1) == dqe->second.totalFragments)) {
if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) {
// We have all fragments -- assemble and process full Packet // We have all fragments -- assemble and process full Packet
//TRACE("packet %.16llx is complete, assembling and processing...",pid); //TRACE("packet %.16llx is complete, assembling and processing...",pid);
// packet already contains head, so append fragments // packet already contains head, so append fragments
for(unsigned int f=1;f<dqe->second.totalFragments;++f) for(unsigned int f=1;f<dq.totalFragments;++f)
packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength()); packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
_defragQueue.erase(dqe); _defragQueue.erase(pid); // dq no longer valid after this
if (!packet->tryDecode(RR)) { if (!packet->tryDecode(RR)) {
Mutex::Lock _l(_rxQueue_m); Mutex::Lock _l(_rxQueue_m);
@ -672,7 +681,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat
} }
} else { } else {
// Still waiting on more fragments, so queue the head // Still waiting on more fragments, so queue the head
dqe->second.frag0 = packet; dq.frag0 = packet;
} }
} // else this is a duplicate head, ignore } // else this is a duplicate head, ignore
} else { } else {

View file

@ -45,6 +45,7 @@
#include "Network.hpp" #include "Network.hpp"
#include "SharedPtr.hpp" #include "SharedPtr.hpp"
#include "IncomingPacket.hpp" #include "IncomingPacket.hpp"
#include "Hashtable.hpp"
/* Ethernet frame types that might be relevant to us */ /* Ethernet frame types that might be relevant to us */
#define ZT_ETHERTYPE_IPV4 0x0800 #define ZT_ETHERTYPE_IPV4 0x0800
@ -189,49 +190,70 @@ private:
// Outsanding WHOIS requests and how many retries they've undergone // Outsanding WHOIS requests and how many retries they've undergone
struct WhoisRequest struct WhoisRequest
{ {
WhoisRequest() : lastSent(0),retries(0) {}
uint64_t lastSent; uint64_t lastSent;
Address peersConsulted[ZT_MAX_WHOIS_RETRIES]; // by retry Address peersConsulted[ZT_MAX_WHOIS_RETRIES]; // by retry
unsigned int retries; // 0..ZT_MAX_WHOIS_RETRIES unsigned int retries; // 0..ZT_MAX_WHOIS_RETRIES
}; };
std::map< Address,WhoisRequest > _outstandingWhoisRequests; Hashtable< Address,WhoisRequest > _outstandingWhoisRequests;
Mutex _outstandingWhoisRequests_m; Mutex _outstandingWhoisRequests_m;
// Packet defragmentation queue -- comes before RX queue in path // Packet defragmentation queue -- comes before RX queue in path
struct DefragQueueEntry struct DefragQueueEntry
{ {
DefragQueueEntry() : creationTime(0),totalFragments(0),haveFragments(0) {}
uint64_t creationTime; uint64_t creationTime;
SharedPtr<IncomingPacket> frag0; SharedPtr<IncomingPacket> frag0;
Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1]; Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
unsigned int totalFragments; // 0 if only frag0 received, waiting for frags unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
uint32_t haveFragments; // bit mask, LSB to MSB uint32_t haveFragments; // bit mask, LSB to MSB
}; };
std::map< uint64_t,DefragQueueEntry > _defragQueue; Hashtable< uint64_t,DefragQueueEntry > _defragQueue;
Mutex _defragQueue_m; Mutex _defragQueue_m;
// ZeroTier-layer RX queue of incoming packets in the process of being decoded // ZeroTier-layer RX queue of incoming packets in the process of being decoded
std::list< SharedPtr<IncomingPacket> > _rxQueue; std::list< SharedPtr<IncomingPacket> > _rxQueue;
Mutex _rxQueue_m; Mutex _rxQueue_m;
// ZeroTier-layer TX queue by destination ZeroTier address // ZeroTier-layer TX queue entry
struct TXQueueEntry struct TXQueueEntry
{ {
TXQueueEntry() {} TXQueueEntry() {}
TXQueueEntry(uint64_t ct,const Packet &p,bool enc,uint64_t nw) : TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc,uint64_t nw) :
dest(d),
creationTime(ct), creationTime(ct),
nwid(nw), nwid(nw),
packet(p), packet(p),
encrypt(enc) {} encrypt(enc) {}
Address dest;
uint64_t creationTime; uint64_t creationTime;
uint64_t nwid; uint64_t nwid;
Packet packet; // unencrypted/unMAC'd packet -- this is done at send time Packet packet; // unencrypted/unMAC'd packet -- this is done at send time
bool encrypt; bool encrypt;
}; };
std::multimap< Address,TXQueueEntry > _txQueue; std::list< TXQueueEntry > _txQueue;
Mutex _txQueue_m; Mutex _txQueue_m;
// Tracks sending of VERB_RENDEZVOUS to relaying peers // Tracks sending of VERB_RENDEZVOUS to relaying peers
std::map< Array< Address,2 >,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior struct _LastUniteKey
{
_LastUniteKey() : x(0),y(0) {}
_LastUniteKey(const Address &a1,const Address &a2)
{
if (a1 > a2) {
x = a2.toInt();
y = a1.toInt();
} else {
x = a1.toInt();
y = a2.toInt();
}
}
inline unsigned long hashCode() const throw() { return ((unsigned long)x ^ (unsigned long)y); }
inline bool operator==(const _LastUniteKey &k) const throw() { return ((x == k.x)&&(y == k.y)); }
uint64_t x,y;
};
Hashtable< _LastUniteKey,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
Mutex _lastUniteAttempt_m; Mutex _lastUniteAttempt_m;
// Active attempts to contact remote peers, including state of multi-phase NAT traversal // Active attempts to contact remote peers, including state of multi-phase NAT traversal

View file

@ -103,7 +103,7 @@ SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
SharedPtr<Peer> p(_activePeers.insert(std::pair< Address,SharedPtr<Peer> >(peer->address(),peer)).first->second); SharedPtr<Peer> &p = _activePeers.set(peer->address(),peer);
p->use(now); p->use(now);
_saveIdentity(p->identity()); _saveIdentity(p->identity());
@ -160,9 +160,9 @@ SharedPtr<Peer> Topology::getBestRoot(const Address *avoid,unsigned int avoidCou
if (++sna == _rootAddresses.end()) if (++sna == _rootAddresses.end())
sna = _rootAddresses.begin(); // wrap around at end sna = _rootAddresses.begin(); // wrap around at end
if (*sna != RR->identity.address()) { // pick one other than us -- starting from me+1 in sorted set order if (*sna != RR->identity.address()) { // pick one other than us -- starting from me+1 in sorted set order
std::map< Address,SharedPtr<Peer> >::const_iterator p(_activePeers.find(*sna)); SharedPtr<Peer> *p = _activePeers.get(*sna);
if ((p != _activePeers.end())&&(p->second->hasActiveDirectPath(now))) { if ((p)&&((*p)->hasActiveDirectPath(now))) {
bestRoot = p->second; bestRoot = *p;
break; break;
} }
} }
@ -249,10 +249,12 @@ bool Topology::isRoot(const Identity &id) const
void Topology::clean(uint64_t now) void Topology::clean(uint64_t now)
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();) { Hashtable< Address,SharedPtr<Peer> >::Iterator i(_activePeers);
if (((now - p->second->lastUsed()) >= ZT_PEER_IN_MEMORY_EXPIRATION)&&(std::find(_rootAddresses.begin(),_rootAddresses.end(),p->first) == _rootAddresses.end())) { Address *a = (Address *)0;
_activePeers.erase(p++); SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
} else ++p; while (i.next(a,p))
if (((now - (*p)->lastUsed()) >= ZT_PEER_IN_MEMORY_EXPIRATION)&&(std::find(_rootAddresses.begin(),_rootAddresses.end(),*a) == _rootAddresses.end())) {
_activePeers.erase(*a);
} }
} }

View file

@ -44,6 +44,7 @@
#include "Mutex.hpp" #include "Mutex.hpp"
#include "InetAddress.hpp" #include "InetAddress.hpp"
#include "Dictionary.hpp" #include "Dictionary.hpp"
#include "Hashtable.hpp"
namespace ZeroTier { namespace ZeroTier {
@ -163,17 +164,20 @@ public:
inline void eachPeer(F f) inline void eachPeer(F f)
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
for(std::map< Address,SharedPtr<Peer> >::const_iterator p(_activePeers.begin());p!=_activePeers.end();++p) Hashtable< Address,SharedPtr<Peer> >::Iterator i(_activePeers);
f(*this,p->second); Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p))
f(*this,*p);
} }
/** /**
* @return All currently active peers by address * @return All currently active peers by address
*/ */
inline std::map< Address,SharedPtr<Peer> > allPeers() const inline std::vector< std::pair< Address,SharedPtr<Peer> > > allPeers() const
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
return _activePeers; return _activePeers.entries();
} }
/** /**
@ -190,7 +194,7 @@ private:
const RuntimeEnvironment *RR; const RuntimeEnvironment *RR;
std::map< Address,SharedPtr<Peer> > _activePeers; Hashtable< Address,SharedPtr<Peer> > _activePeers;
std::map< Identity,std::vector<InetAddress> > _roots; std::map< Identity,std::vector<InetAddress> > _roots;
std::vector< Address > _rootAddresses; std::vector< Address > _rootAddresses;
std::vector< SharedPtr<Peer> > _rootPeers; std::vector< SharedPtr<Peer> > _rootPeers;