diff --git a/node/Constants.hpp b/node/Constants.hpp index a0d175156..cf0be114d 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -193,7 +193,7 @@ /** * How often Topology::clean() and Network::clean() and similar are called, in ms */ -#define ZT_DB_CLEAN_PERIOD 300000 +#define ZT_DB_CLEAN_PERIOD 120000 /** * How long to remember peer records in RAM if they haven't been used @@ -238,19 +238,9 @@ #define ZT_MULTICAST_LOCAL_POLL_PERIOD 10000 /** - * Minimum delay between multicast endpoint gathering attempts - * - * Actual delay will vary between MIN and MAX research rate depending on - * how many endpoints we have -- MIN for 0, MAX for one less than limit. - * If we have the limit of known multicast endpoints, no further attempts - * to gather them are made. + * Delay between explicit MULTICAST_GATHER requests for a given multicast channel */ -#define ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN (ZT_MULTICAST_LIKE_EXPIRE / 60) - -/** - * Maximum delay between multicast endpoint gathering attempts - */ -#define ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX (ZT_MULTICAST_LIKE_EXPIRE / 2) +#define ZT_MULTICAST_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10) /** * Timeout for outgoing multicasts @@ -258,12 +248,17 @@ * Attempts will be made to gather recipients and send until we reach * the limit or sending times out. */ -#define ZT_MULTICAST_TRANSMIT_TIMEOUT (ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN * 3) +#define ZT_MULTICAST_TRANSMIT_TIMEOUT (ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN * 2) + +/** + * Default number of endpoints to implicitly gather from peers with each multicast frame + */ +#define ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER 8 /** * Default maximum number of peers to address with a single multicast (if unspecified in network) */ -#define ZT_DEFAULT_MULTICAST_LIMIT 64 +#define ZT_MULTICAST_DEFAULT_LIMIT 128 /** * Delay between scans of the topology active peer DB for peers that need ping diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp index e8ab1ea10..39fc700d5 100644 --- a/node/Multicaster.cpp +++ b/node/Multicaster.cpp @@ -28,16 +28,18 @@ #include #include "Constants.hpp" +#include "SharedPtr.hpp" #include "Multicaster.hpp" #include "Topology.hpp" #include "Switch.hpp" #include "Packet.hpp" +#include "Peer.hpp" #include "RuntimeEnvironment.hpp" namespace ZeroTier { Multicaster::Multicaster() : - _limit(ZT_DEFAULT_MULTICAST_LIMIT) + _limit(ZT_MULTICAST_DEFAULT_LIMIT) { } @@ -54,29 +56,50 @@ void send(const RuntimeEnvironment *RR,uint64_t nwid,unsigned int limit,uint64_t // If we already have enough members, just send and we're done -- no need for TX queue OutboundMulticast out; - out.init(now,RR->identity.address(),nwid,src,mg,etherType,data,len); - for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs) + out.init(now,RR->identity.address(),nwid,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len); + unsigned int count = 0; + for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs) { out.sendOnly(*(RR->sw),m->address); + if (++count >= limit) + break; + } } else { // If we don't already have enough members, send to the ones we have and then gather (if allowed within gather rate limit delay) gs.txQueue.push_back(OutboundMulticast()); OutboundMulticast &out = gs.txQueue.back(); - out.init(now,RR->identity.address(),nwid,src,mg,etherType,data,len); + out.init(now,RR->identity.address(),nwid,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len); for(std::vector::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs) out.sendAndLog(*(RR->sw),m->address); + if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_GATHER_DELAY) { + gs.lastExplicitGather = now; + // Explicitly gather -- right now we only do this from supernodes since they + // know all multicast group memberships. In the future this might be more + // distributed somehow. + SharedPtr sn(RR->topology->getBestSupernode()); + if (sn) { + Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER); + outp.append(nwid); + outp.append((char)0); // TODO: include network membership cert + mg.mac().appendTo(outp); + outp.append((uint32_t)mg.adi()); + outp.append((uint32_t)((limit - (unsigned int)gs.members.size()) + 1)); // +1 just means we'll have an extra in the queue if available + outp.armor(sn->key(),true); + sn->send(RR,outp.data(),outp.size(),now); + } + } } } -void Multicaster::clean(uint64_t now,const Topology &topology) +void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int limit) { Mutex::Lock _l(_groups_m); for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) { // Remove expired outgoing multicasts from multicast TX queue for(std::list::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) { - if (tx->expired(now)) + if ((tx->expired(now))||(tx->sentToCount() >= limit)) mm->second.txQueue.erase(tx++); else ++tx; } @@ -98,12 +121,12 @@ void Multicaster::clean(uint64_t now,const Topology &topology) * about them minus one day (a large constant) to put these at the bottom of the list. * List is sorted in ascending order of rank and multicasts are sent last-to-first. */ if (writer->learnedFrom) { - SharedPtr p(topology.getPeer(writer->learnedFrom)); + SharedPtr p(RR->topology.getPeer(writer->learnedFrom)); if (p) writer->rank = p->lastUnicastFrame() - ZT_MULTICAST_LIKE_EXPIRE; else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE); } else { - SharedPtr p(topology.getPeer(writer->address)); + SharedPtr p(RR->topology.getPeer(writer->address)); if (p) writer->rank = p->lastUnicastFrame(); else writer->rank = writer->timestamp - 86400000; @@ -127,24 +150,18 @@ void Multicaster::clean(uint64_t now,const Topology &topology) } } -void Multicaster::_add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member) +void Multicaster::_add(const RuntimeEnvironment *RR,uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member) { // assumes _groups_m is locked -} - -unsigned int Multicaster::_want(const MulticastGroup &mg,MulticastGroupStatus &gs,uint64_t now,unsigned int limit) -{ - if (gs.members.size() >= limit) { - // We already caught our limit, don't need to go fishing any more. - return 0; - } else { - // Compute the delay between fishing expeditions from the fraction of the limit that we already have. - const uint64_t rateDelay = (uint64_t)ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN + (uint64_t)(((double)gs.members.size() / (double)limit) * (double)(ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX - ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN)); - if ((now - gs.lastGatheredMembers) >= rateDelay) { - gs.lastGatheredMembers = now; - return (limit - (unsigned int)gs.members.size()); - } else return 0; + for(std::vector::iterator m(gs.members.begin());m!=gs.members.end();++m) { + if (m->address == member) { + if (m->learnedFrom) + m->learnedFrom = learnedFrom; // only update with indirect learnedFrom if we've never directly learned from this peer + m->timestamp = now; + return; + } } + gs.members.push_back(MulticastGroupMember(member,learnedFrom,now)); } } // namespace ZeroTier diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp index b757d8a38..0785c58bf 100644 --- a/node/Multicaster.hpp +++ b/node/Multicaster.hpp @@ -56,11 +56,11 @@ private: struct MulticastGroupMember { MulticastGroupMember() {} - MulticastGroupMember(const Address &a,const Address &lf,uint64_t ts) : address(a),learnedFrom(lf),timestamp(ts) {} + MulticastGroupMember(const Address &a,const Address &lf,uint64_t ts) : address(a),learnedFrom(lf),timestamp(ts),rank(0) {} Address address; Address learnedFrom; // NULL/0 for addresses directly learned from LIKE - uint64_t timestamp; // time of last LIKE or OK response to MULTICAST_LONELY + uint64_t timestamp; // time of last LIKE/OK(GATHER) uint64_t rank; // used by sorting algorithm in clean() // for sorting in ascending order of rank @@ -69,9 +69,9 @@ private: struct MulticastGroupStatus { - MulticastGroupStatus() : lastGatheredMembers(0) {} + MulticastGroupStatus() : lastExplicitGather(0) {} - uint64_t lastGatheredMembers; // time we last gathered members + uint64_t lastExplicitGather; // time we last gathered members explicitly std::list txQueue; // pending outbound multicasts std::vector members; // members of this group }; @@ -84,14 +84,15 @@ public: * Add or update a member in a multicast group and send any pending multicasts * * @param RR Runtime environment + * @param now Current time * @param mg Multicast group * @param learnedFrom Address from which we learned this member or NULL/0 Address if direct * @param member New member address */ - inline void add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member) + inline void add(const RuntimeEnvironment *RR,uint64_t now,const MulticastGroup &mg,const Address &learnedFrom,const Address &member) { Mutex::Lock _l(_groups_m); - _add(RR,mg,learnedFrom,member); + _add(RR,uint64_t now,_groups[mg],learnedFrom,member); } /** @@ -114,12 +115,12 @@ public: * * @param RR Runtime environment * @param now Current time + * @param limit Multicast limit */ - void clean(const RuntimeEnvironment *RR,uint64_t now); + void clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int limit); private: - void _add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member); - unsigned int _want(const MulticastGroup &mg,MulticastGroupStatus &gs,uint64_t now,unsigned int limit); + void _add(const RuntimeEnvironment *RR,uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member); std::map< MulticastGroup,MulticastGroupStatus > _groups; Mutex _groups_m; diff --git a/node/Network.cpp b/node/Network.cpp index 70f5ae417..bed50e171 100644 --- a/node/Network.cpp +++ b/node/Network.cpp @@ -318,7 +318,7 @@ void Network::clean() } } { - _multicastTopology.clean(now,*(RR->topology),(_config) ? _config->multicastLimit() : (unsigned int)ZT_DEFAULT_MULTICAST_LIMIT); + _multicastTopology.clean(now,*(RR->topology),(_config) ? _config->multicastLimit() : (unsigned int)ZT_MULTICAST_DEFAULT_LIMIT); } } diff --git a/node/NetworkConfig.cpp b/node/NetworkConfig.cpp index 8b85cd87a..ddb98cce5 100644 --- a/node/NetworkConfig.cpp +++ b/node/NetworkConfig.cpp @@ -86,7 +86,7 @@ void NetworkConfig::_fromDictionary(const Dictionary &d) _timestamp = Utils::hexStrToU64(d.get(ZT_NETWORKCONFIG_DICT_KEY_TIMESTAMP).c_str()); _issuedTo = Address(d.get(ZT_NETWORKCONFIG_DICT_KEY_ISSUED_TO)); _multicastLimit = Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_MULTICAST_LIMIT,zero).c_str()); - if (_multicastLimit == 0) _multicastLimit = ZT_DEFAULT_MULTICAST_LIMIT; + if (_multicastLimit == 0) _multicastLimit = ZT_MULTICAST_DEFAULT_LIMIT; _allowPassiveBridging = (Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_ALLOW_PASSIVE_BRIDGING,zero).c_str()) != 0); _private = (Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_PRIVATE,one).c_str()) != 0); _enableBroadcast = (Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_ENABLE_BROADCAST,one).c_str()) != 0); diff --git a/node/OutboundMulticast.hpp b/node/OutboundMulticast.hpp index 8e71b6adf..9ce59bbf8 100644 --- a/node/OutboundMulticast.hpp +++ b/node/OutboundMulticast.hpp @@ -63,6 +63,7 @@ public: * @param timestamp Creation time * @param self My ZeroTier address * @param nwid Network ID + * @param gatherLimit Number to lazily/implicitly gather with this frame or 0 for none * @param src Source MAC address of frame * @param dest Destination multicast group (MAC + ADI) * @param etherType 16-bit Ethernet type ID @@ -70,7 +71,7 @@ public: * @param len Length of data * @throws std::out_of_range Data too large to fit in a MULTICAST_FRAME */ - inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) + inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,unsigned int gatherLimit,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) { _timestamp = timestamp; _nwid = nwid; @@ -79,7 +80,9 @@ public: _etherType = etherType; _packet.setSource(self); _packet.setVerb(Packet::VERB_MULTICAST_FRAME); - _packet.append((char)0); + _packet.append((uint64_t)nwid); + _packet.append((char)0); // 0 flags + _packet.append((uint32_t)gatherLimit); // gather limit -- set before send, start with 0 _packet.append((uint32_t)dest.adi()); dest.mac().appendTo(_packet); src.appendTo(_packet); @@ -124,7 +127,7 @@ public: inline void sendAndLog(Switch &sw,const Address &toAddr) { _alreadySentTo.push_back(toAddr); - sendOnly(sw,toAddr); + sendOnly(sw,toAddr,gatherLimit); } /** @@ -140,7 +143,7 @@ public: if (*a == toAddr) return false; } - sendAndLog(sw,toAddr); + sendAndLog(sw,toAddr,gatherLimit); return true; } diff --git a/node/Packet.hpp b/node/Packet.hpp index 214479d6e..cfeb66f8d 100644 --- a/node/Packet.hpp +++ b/node/Packet.hpp @@ -660,8 +660,8 @@ public: VERB_NETWORK_CONFIG_REFRESH = 12, /* Request endpoints for multicast distribution: - * <[1] flags> * <[8] 64-bit network ID> + * <[1] flags> * <[6] MAC address of multicast group being queried> * <[4] 32-bit ADI for multicast group being queried> * <[4] 32-bit (suggested) max number of multicast peers desired or 0 for no limit> @@ -683,6 +683,9 @@ public: * <[2] 16-bit number of members enumerated in this packet> * <[...] series of 5-byte ZeroTier addresses of enumerated members> * + * If no endpoints are known, OK and ERROR are both optional. It's okay + * to return nothing in that case since gathering is "lazy." + * * ERROR response payload: * <[8] 64-bit network ID> * <[6] MAC address of multicast group being queried> @@ -696,6 +699,7 @@ public: /* Multicast frame: * <[8] 64-bit network ID> * <[1] flags (currently unused, must be 0)> + * <[4] 32-bit (suggested) gather limit or 0 for no gathering> * <[4] 32-bit multicast ADI (note that this is out of order here -- it precedes MAC)> * <[6] destination MAC or all zero for destination node> * <[6] source MAC or all zero for node of origin> @@ -705,7 +709,16 @@ public: * This is similar to EXT_FRAME but carries a multicast, and is sent * out to recipients on a multicast list. * - * OK is not generated. + * (ADI precedes MAC here so that everything from destination MAC forward + * could be treated as a raw Ethernet frame.) + * + * OK response payload: + * <[1] flags> + * [... same as OK(GATHER) if flag 0x01 is set ...] + * + * Flags in OK are 0x01 for "gathering results returned," which can be + * sent if a gather limit is specified in the original FRAME and there + * are known endpoints to gather. This way frames can also gather. * * ERROR response payload: * <[6] multicast group MAC>