Make network multicast breadth/depth parameters configurable on a per-net basis.

This commit is contained in:
Adam Ierymenko 2013-09-17 16:11:57 -04:00
parent 4c06fcfc9d
commit f9079a110e
7 changed files with 65 additions and 27 deletions

View file

@ -204,15 +204,19 @@ int main(int argc,char **argv)
} }
bool isOpen = false; bool isOpen = false;
unsigned int mcb = 3;
unsigned int mcd = 6;
std::string name,desc; std::string name,desc;
{ {
Query q = dbCon->query(); Query q = dbCon->query();
q << "SELECT name,`desc`,isOpen FROM Network WHERE id = " << nwid; q << "SELECT name,`desc`,isOpen,multicastPropagationBreadth,multicastPropagationDepth FROM Network WHERE id = " << nwid;
StoreQueryResult rs = q.store(); StoreQueryResult rs = q.store();
if (rs.num_rows() > 0) { if (rs.num_rows() > 0) {
name = rs[0]["name"].c_str(); name = rs[0]["name"].c_str();
desc = rs[0]["desc"].c_str(); desc = rs[0]["desc"].c_str();
isOpen = ((int)rs[0]["isOpen"] > 0); isOpen = ((int)rs[0]["isOpen"] > 0);
mcb = (unsigned int)rs[0]["multicastPropagationBreadth"];
mcd = (unsigned int)rs[0]["multicastPropagationDepth"];
} else { } else {
Dictionary response; Dictionary response;
response["peer"] = peerIdentity.address().toString(); response["peer"] = peerIdentity.address().toString();
@ -293,6 +297,10 @@ int main(int argc,char **argv)
sprintf(buf,"%llx",(unsigned long long)Utils::now()); sprintf(buf,"%llx",(unsigned long long)Utils::now());
netconf["ts"] = buf; netconf["ts"] = buf;
netconf["peer"] = peerIdentity.address().toString(); netconf["peer"] = peerIdentity.address().toString();
sprintf(buf,"%x",mcb);
netconf["mcb"] = mcb;
sprintf(buf,"%x",mcd);
netconf["mcd"] = mcd;
if (!isOpen) { if (!isOpen) {
// TODO: handle closed networks, look up private membership, // TODO: handle closed networks, look up private membership,

View file

@ -233,25 +233,12 @@ error_no_ZT_ARCH_defined;
/** /**
* Breadth of tree for rumor mill multicast propagation * Breadth of tree for rumor mill multicast propagation
*/ */
#define ZT_MULTICAST_PROPAGATION_BREADTH 4 #define ZT_MULTICAST_DEFAULT_PROPAGATION_BREADTH 3
/** /**
* Depth of tree for rumor mill multicast propagation * Depth of tree for rumor mill multicast propagation
*
* The maximum number of peers who can receive a multicast is equal to
* the sum of BREADTH^i where I is from 1 to DEPTH. This ignores the effect
* of the rate limiting algorithm or bloom filter collisions.
*
* 5 results in a max of 1364 recipients for a given multicast. With a limit
* of 50 bytes/sec (average) for multicast, this results in a worst case of
* around 68kb/sec of multicast traffic. FYI the average multicast traffic
* from a Mac seems to be about ~25bytes/sec. Windows measurements are TBD.
* Linux is quieter than Mac.
*
* This are eventually going to become per-network tunable parameters, along
* with per-network peer multicast rate limits.
*/ */
#define ZT_MULTICAST_PROPAGATION_DEPTH 5 #define ZT_MULTICAST_DEFAULT_PROPAGATION_DEPTH 6
/** /**
* Length of ring buffer history of recent multicast packets * Length of ring buffer history of recent multicast packets

View file

@ -52,9 +52,6 @@
#include "CMWC4096.hpp" #include "CMWC4096.hpp"
#include "C25519.hpp" #include "C25519.hpp"
// Maximum sample size to pick during choice of multicast propagation peers
#define ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE (ZT_MULTICAST_PROPAGATION_BREADTH * 8)
namespace ZeroTier { namespace ZeroTier {
/** /**
@ -253,7 +250,7 @@ public:
Mutex::Lock _l(_multicastMemberships_m); Mutex::Lock _l(_multicastMemberships_m);
std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg))); std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg)));
if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) { if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) {
for(unsigned int stries=0;((stries<ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE)&&(chosen < max));++stries) { for(unsigned int stries=0,stmax=(max*10);((stries<stmax)&&(chosen < max));++stries) {
MulticastMembership &m = mm->second[prng.next32() % mm->second.size()]; MulticastMembership &m = mm->second[prng.next32() % mm->second.size()];
unsigned int sum = m.first.sum(); unsigned int sum = m.first.sum();
if ( if (
@ -320,7 +317,7 @@ public:
Mutex::Lock _l(_multicastMemberships_m); Mutex::Lock _l(_multicastMemberships_m);
std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg))); std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg)));
if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) { if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) {
for(unsigned int stries=0;stries<ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE;++stries) { for(unsigned int stries=0,stmax=(max*10);stries<stmax;++stries) {
MulticastMembership &m = mm->second[prng.next32() % mm->second.size()]; MulticastMembership &m = mm->second[prng.next32() % mm->second.size()];
if ( if (
((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */ ((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */

View file

@ -159,6 +159,8 @@ SharedPtr<Network> Network::newInstance(const RuntimeEnvironment *renv,uint64_t
nw->_ready = false; // disable handling of Ethernet frames during construct nw->_ready = false; // disable handling of Ethernet frames during construct
nw->_r = renv; nw->_r = renv;
nw->_tap = new EthernetTap(renv,tag,renv->identity.address().toMAC(),ZT_IF_MTU,&_CBhandleTapData,nw.ptr()); nw->_tap = new EthernetTap(renv,tag,renv->identity.address().toMAC(),ZT_IF_MTU,&_CBhandleTapData,nw.ptr());
nw->_multicastPropagationBreadth = 0;
nw->_multicastPropagationDepth = 0;
memset(nw->_etWhitelist,0,sizeof(nw->_etWhitelist)); memset(nw->_etWhitelist,0,sizeof(nw->_etWhitelist));
nw->_id = id; nw->_id = id;
nw->_lastConfigUpdate = 0; nw->_lastConfigUpdate = 0;
@ -179,6 +181,8 @@ void Network::setConfiguration(const Network::Config &conf)
_configuration = conf; _configuration = conf;
_myCertificate = conf.certificateOfMembership(); _myCertificate = conf.certificateOfMembership();
_mcRates = conf.multicastRates(); _mcRates = conf.multicastRates();
_multicastPropagationBreadth = conf.multicastPropagationBreadth();
_multicastPropagationDepth = conf.multicastPropagationDepth();
_lastConfigUpdate = Utils::now(); _lastConfigUpdate = Utils::now();
_tap->setIps(conf.staticAddresses()); _tap->setIps(conf.staticAddresses());

View file

@ -317,6 +317,28 @@ public:
return get("desc",std::string()); return get("desc",std::string());
} }
/**
* @return Breadth for multicast propagation
*/
inline unsigned int multicastPropagationBreadth() const
{
const_iterator mcb(find("mcb"));
if (mcb == end())
return ZT_MULTICAST_DEFAULT_PROPAGATION_BREADTH;
return Utils::hexStrToUInt(mcb->second.c_str());
}
/**
* @return Depth for multicast propagation
*/
inline unsigned int multicastPropagationDepth() const
{
const_iterator mcd(find("mcd"));
if (mcd == end())
return ZT_MULTICAST_DEFAULT_PROPAGATION_DEPTH;
return Utils::hexStrToUInt(mcd->second.c_str());
}
/** /**
* @return Certificate of membership for this network, or empty cert if none * @return Certificate of membership for this network, or empty cert if none
*/ */
@ -586,6 +608,24 @@ public:
//return tmp; //return tmp;
} }
/**
* @return Breadth for multicast rumor mill propagation
*/
inline unsigned int multicastPropagationBreadth() const
throw()
{
return _multicastPropagationBreadth;
}
/**
* @return Depth for multicast rumor mill propagation
*/
inline unsigned int multicastPropagationDepth() const
throw()
{
return _multicastPropagationDepth;
}
private: private:
static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data); static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data);
void _restoreState(); void _restoreState();
@ -606,6 +646,8 @@ private:
Config _configuration; Config _configuration;
CertificateOfMembership _myCertificate; // memoized from _configuration CertificateOfMembership _myCertificate; // memoized from _configuration
MulticastRates _mcRates; // memoized from _configuration MulticastRates _mcRates; // memoized from _configuration
unsigned int _multicastPropagationBreadth; // memoized from _configuration
unsigned int _multicastPropagationDepth; // memoized from _configuration
// Ethertype whitelist bit field, set from config, for really fast lookup // Ethertype whitelist bit field, set from config, for really fast lookup
unsigned char _etWhitelist[65536 / 8]; unsigned char _etWhitelist[65536 / 8];

View file

@ -538,14 +538,14 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
_r->multicaster->addToDedupHistory(mccrc,now); _r->multicaster->addToDedupHistory(mccrc,now);
} }
if (++hops >= ZT_MULTICAST_PROPAGATION_DEPTH) { if (++hops >= network->multicastPropagationDepth()) {
TRACE("not propagating MULTICAST_FRAME from original submitter %s, received from %s(%s): max depth reached",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); TRACE("not propagating MULTICAST_FRAME from original submitter %s, received from %s(%s): max depth reached",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
return true; return true;
} }
Address upstream(source()); // save this since we might mangle it below Address upstream(source()); // save this since we might mangle it below
Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES)); Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; SharedPtr<Peer> propPeers[16];
unsigned int np = 0; unsigned int np = 0;
if (_r->topology->amSupernode()) { if (_r->topology->amSupernode()) {
@ -567,7 +567,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
originalSubmitterAddress, originalSubmitterAddress,
upstream, upstream,
bloom, bloom,
ZT_MULTICAST_PROPAGATION_BREADTH, std::min(network->multicastPropagationBreadth(),(unsigned int)16), // 16 is a sanity check
propPeers, propPeers,
now); now);
} else if (isDuplicate) { } else if (isDuplicate) {
@ -584,7 +584,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
originalSubmitterAddress, originalSubmitterAddress,
upstream, upstream,
bloom, bloom,
ZT_MULTICAST_PROPAGATION_BREADTH, std::min(network->multicastPropagationBreadth(),(unsigned int)16), // 16 is a sanity check
propPeers, propPeers,
now); now);
} }

View file

@ -119,7 +119,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
} }
Multicaster::MulticastBloomFilter bloom; Multicaster::MulticastBloomFilter bloom;
SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; SharedPtr<Peer> propPeers[16];
unsigned int np = _r->multicaster->pickSocialPropagationPeers( unsigned int np = _r->multicaster->pickSocialPropagationPeers(
*(_r->prng), *(_r->prng),
*(_r->topology), *(_r->topology),
@ -128,7 +128,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
_r->identity.address(), _r->identity.address(),
Address(), Address(),
bloom, bloom,
ZT_MULTICAST_PROPAGATION_BREADTH, std::min(network->multicastPropagationBreadth(),(unsigned int)16), // 16 is a sanity check
propPeers, propPeers,
now); now);