New multicast algorithm work in progress...

This commit is contained in:
Adam Ierymenko 2013-09-21 16:46:00 -04:00
parent 64c9c2e06b
commit 770fbaf4b2
5 changed files with 98 additions and 317 deletions

View file

@ -230,26 +230,6 @@ error_no_ZT_ARCH_defined;
*/ */
#define ZT_RELAY_MAX_HOPS 3 #define ZT_RELAY_MAX_HOPS 3
/**
* Breadth of tree for rumor mill multicast propagation
*/
#define ZT_MULTICAST_DEFAULT_PROPAGATION_BREADTH 3
/**
* Depth of tree for rumor mill multicast propagation
*/
#define ZT_MULTICAST_DEFAULT_PROPAGATION_DEPTH 6
/**
* Length of ring buffer history of recent multicast packets
*/
#define ZT_MULTICAST_DEDUP_HISTORY_LENGTH 1024
/**
* Expiration time in ms for multicast deduplication history items
*/
#define ZT_MULTICAST_DEDUP_HISTORY_EXPIRE 2000
/** /**
* Period between announcements of all multicast 'likes' in ms * Period between announcements of all multicast 'likes' in ms
* *
@ -264,6 +244,11 @@ error_no_ZT_ARCH_defined;
*/ */
#define ZT_MULTICAST_LIKE_EXPIRE ((ZT_MULTICAST_LIKE_ANNOUNCE_ALL_PERIOD * 2) + 1000) #define ZT_MULTICAST_LIKE_EXPIRE ((ZT_MULTICAST_LIKE_ANNOUNCE_ALL_PERIOD * 2) + 1000)
/**
* Expiration for remembered MULTICAST_GOTs, in ms
*/
#define ZT_MULTICAST_MAGNET_STATE_EXPIRE 30000
/** /**
* Time between polls of local taps for multicast membership changes * Time between polls of local taps for multicast membership changes
*/ */

View file

@ -31,349 +31,146 @@
#include <stdint.h> #include <stdint.h>
#include <string.h> #include <string.h>
#include <utility>
#include <algorithm>
#include <stdexcept> #include <stdexcept>
#include <map> #include <map>
#include <set>
#include <vector> #include <vector>
#include <string> #include <set>
#include <algorithm>
#include "Constants.hpp" #include "Constants.hpp"
#include "Buffer.hpp" #include "Mutex.hpp"
#include "Packet.hpp"
#include "MulticastGroup.hpp" #include "MulticastGroup.hpp"
#include "Utils.hpp" #include "Utils.hpp"
#include "MAC.hpp"
#include "Address.hpp" #include "Address.hpp"
#include "SharedPtr.hpp"
#include "BloomFilter.hpp"
#include "Identity.hpp"
#include "CMWC4096.hpp"
#include "C25519.hpp"
namespace ZeroTier { namespace ZeroTier {
/** /**
* Multicast propagation engine * Multicast propagation algorithm
*
* This is written as a generic class so that it can be mocked and tested
* in simulation. It also always takes 'now' as an argument, permitting
* running in simulated time.
*
* This does not handle network permission or rate limiting, only the
* propagation algorithm.
*/ */
class Multicaster class Multicaster
{ {
public: public:
/** Multicaster() {}
* Simple bit field bloom filter included with multicast frame packets
*/
typedef BloomFilter<ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BITS> MulticastBloomFilter;
Multicaster()
throw()
{
memset(_multicastHistory,0,sizeof(_multicastHistory));
_multicastHistoryPtr = 0;
}
/** /**
* Generate a signature of a multicast packet using an identity * Add or renew a peer's subscription to a multicast group
* *
* @param id Identity to sign with (must have secret key portion) * @param a Address that LIKEd
* @param nwid Network ID * @param mg Multicast group
* @param from MAC address of sender
* @param to Multicast group
* @param etherType 16-bit ethernet type
* @param data Ethernet frame data
* @param len Length of frame
* @return Signature of packet data and attributes
* @throws std::runtime_error Cannot sign, e.g. identity has no private key
*/
static inline C25519::Signature signMulticastPacket(const Identity &id,uint64_t nwid,const MAC &from,const MulticastGroup &to,unsigned int etherType,const void *data,unsigned int len)
throw(std::runtime_error)
{
char tmp[65536];
void *tmp2 = (void *)tmp;
*((uint64_t *)tmp2) = Utils::hton((uint64_t)nwid);
memcpy(tmp + 8,from.data,6);
memcpy(tmp + 14,to.mac().data,6);
*((uint32_t *)(tmp + 20)) = Utils::hton((uint32_t)to.adi());
*((uint16_t *)(tmp + 24)) = Utils::hton((uint16_t)etherType);
memcpy(tmp + 26,data,std::min((unsigned int)(sizeof(tmp) - 26),len)); // min() is a sanity check here, no packet is that big
return id.sign(tmp,len + 26);
}
/**
* Verify a signature from a multicast packet
*
* @param id Identity of original signer
* @param nwid Network ID
* @param from MAC address of sender
* @param to Multicast group
* @param etherType 16-bit ethernet type
* @param data Ethernet frame data
* @param len Length of frame
* @param signature Signature
* @param siglen Length of signature in bytes
* @return True if signature verification was successful
*/
static bool verifyMulticastPacket(const Identity &id,uint64_t nwid,const MAC &from,const MulticastGroup &to,unsigned int etherType,const void *data,unsigned int len,const void *signature,unsigned int siglen)
{
char tmp[65536];
void *tmp2 = (void *)tmp;
*((uint64_t *)tmp2) = Utils::hton(nwid);
memcpy(tmp + 8,from.data,6);
memcpy(tmp + 14,to.mac().data,6);
*((uint32_t *)(tmp + 20)) = Utils::hton(to.adi());
*((uint16_t *)(tmp + 24)) = Utils::hton((uint16_t)etherType);
memcpy(tmp + 26,data,std::min((unsigned int)(sizeof(tmp) - 26),len)); // min() is a sanity check here, no packet is that big
return id.verify(tmp,len + 26,signature,siglen);
}
/**
* Compute the CRC64 code for multicast deduplication
*
* @param nwid Network ID
* @param from Sender MAC
* @param to Destination multicast group
* @param etherType Ethernet frame type
* @param payload Multicast frame data
* @param len Length of frame
*/
static inline uint64_t computeMulticastDedupCrc(
uint64_t nwid,
const MAC &from,
const MulticastGroup &to,
unsigned int etherType,
const void *payload,
unsigned int len)
throw()
{
// This CRC is only used locally, so byte order issues and
// such don't matter. It can also be changed without protocol
// impact.
uint64_t crc = Utils::crc64(0,from.data,6);
crc = Utils::crc64(crc,to.mac().data,6);
crc ^= (uint64_t)to.adi();
crc ^= (uint64_t)etherType;
crc = Utils::crc64(crc,payload,len);
crc ^= nwid; // also include network ID in CRC
return crc;
}
/**
* Check multicast history to see if this is a duplicate
*
* @param crc Multicast CRC
* @param now Current time
* @return True if this appears to be a duplicate to within history expiration time
*/
inline bool checkDuplicate(uint64_t crc,uint64_t now) const
throw()
{
for(unsigned int i=0;i<ZT_MULTICAST_DEDUP_HISTORY_LENGTH;++i) {
if ((_multicastHistory[i][0] == crc)&&((now - _multicastHistory[i][1]) <= ZT_MULTICAST_DEDUP_HISTORY_EXPIRE))
return true;
}
return false;
}
/**
* Add a multicast CRC to the multicast deduplication history
*
* @param crc Multicast CRC
* @param now Current time * @param now Current time
*/ */
inline void addToDedupHistory(uint64_t crc,uint64_t now) inline void likesGroup(const Address &a,const MulticastGroup &mg,uint64_t now)
throw()
{ {
unsigned int mhi = ++_multicastHistoryPtr % ZT_MULTICAST_DEDUP_HISTORY_LENGTH; Mutex::Lock _l(_lock);
_multicastHistory[mhi][0] = crc; std::map< Address,_PeerInfo >::iterator pi(_peers.find(a));
_multicastHistory[mhi][1] = now; if (pi == _peers.end()) {
pi = _peers.insert(std::pair< Address,_PeerInfo >(a,_PeerInfo())).first;
_proximity.push_front(a);
pi->second.proximitySlot = _proximity.begin();
}
pi->second.groups[mg] = now;
} }
/** /**
* Update the most recent LIKE time for an address in a given multicast group on a given network * Bring a peer closer in terms of propagation priority
* *
* @param nwid Network ID * @param a Address to bring closer (e.g. due to unicast message)
* @param mg Multicast group * @param now Current time
* @param addr Address that likes group on given network
* @param now Current timestamp
*/ */
inline void likesMulticastGroup(const uint64_t nwid,const MulticastGroup &mg,const Address &addr,const uint64_t now) inline void bringCloser(const Address &a)
{ {
Mutex::Lock _l(_multicastMemberships_m); Mutex::Lock _l(_lock);
std::vector<MulticastMembership> &memberships = _multicastMemberships[MulticastChannel(nwid,mg)]; std::map< Address,_PeerInfo >::iterator pi(_peers.find(a));
for(std::vector<MulticastMembership>::iterator mm(memberships.begin());mm!=memberships.end();++mm) { if (pi != _peers.end()) {
if (mm->first == addr) { if (pi->second.proximitySlot != _proximity.begin())
mm->second = now; _proximity.splice(_proximity.begin(),_proximity,pi->second.proximitySlot);
return;
}
} }
memberships.push_back(MulticastMembership(addr,now));
} }
/** /**
* Choose peers for multicast propagation via random selection * Indicate that a peer reported that it GOT a multicast
* *
* @param prng Random source * This only happens on magnet nodes for a propagation.
* @param topology Topology object or mock thereof *
* @param nwid Network ID * @param mcGuid Multicast GUID
* @param mg Multicast group * @param peer Peer that GOT multicast
* @param originalSubmitter Original submitter of multicast message to network * @param now Current time
* @param upstream Address from which message originated, or null (0) address if none
* @param bf Bloom filter, updated in place with sums of addresses in chosen peers and/or decay
* @param max Maximum number of peers to pick
* @param peers Array of objects of type P to fill with up to [max] peers
* @param now Current timestamp
* @return Number of peers actually stored in peers array
* @tparam T Type of topology, which is Topology in running code or a mock in simulation
* @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type)
*/ */
template<typename T,typename P> inlien void got(const Address &peer,uint64_t mcGuid,uint64_t now)
inline unsigned int pickRandomPropagationPeers(
CMWC4096 &prng,
T &topology,
uint64_t nwid,
const MulticastGroup &mg,
const Address &originalSubmitter,
const Address &upstream,
MulticastBloomFilter &bf,
unsigned int max,
P *peers,
uint64_t now)
{ {
unsigned int chosen = 0; Mutex::Lock _l(_lock);
Mutex::Lock _l(_multicastMemberships_m); std::pair< uint64_t,std::set<Address> > &g = _got[mcGuid];
std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg))); g.first = now;
if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) { g.second.insert(peer);
for(unsigned int stries=0,stmax=(max*10);((stries<stmax)&&(chosen < max));++stries) {
MulticastMembership &m = mm->second[prng.next32() % mm->second.size()];
unsigned int sum = m.first.sum();
if (
((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */
(!bf.contains(sum))&& /* Not in propagation bloom */
(m.first != originalSubmitter)&& /* Not the original submitter */
(m.first != upstream) ) { /* Not where the frame came from */
P peer(topology.getPeer(m.first));
if (peer) {
unsigned int chk = 0;
while (chk < chosen) {
if (peers[chk] == peer)
break;
++chk;
}
if (chk == chosen) { /* not already picked */
peers[chosen++] = peer;
bf.set(sum);
}
}
}
}
}
return chosen;
} }
/** /**
* Choose peers for multicast propagation via implicit social switching * Erase entries for expired LIKEs
*
* @param prng Random source
* @param topology Topology object or mock thereof
* @param nwid Network ID
* @param mg Multicast group
* @param originalSubmitter Original submitter of multicast message to network
* @param upstream Address from which message originated, or null (0) address if none
* @param bf Bloom filter, updated in place with sums of addresses in chosen peers and/or decay
* @param max Maximum number of peers to pick
* @param peers Array of objects of type P to fill with up to [max] peers
* @param now Current timestamp
* @return Number of peers actually stored in peers array
* @tparam T Type of topology, which is Topology in running code or a mock in simulation
* @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type)
*/ */
template<typename T,typename P> inline void clean(uint64_t now)
inline unsigned int pickSocialPropagationPeers(
CMWC4096 &prng,
T &topology,
uint64_t nwid,
const MulticastGroup &mg,
const Address &originalSubmitter,
const Address &upstream,
MulticastBloomFilter &bf,
unsigned int max,
P *peers,
uint64_t now)
{ {
typename std::set< P,_PeerPropagationPrioritySortOrder<P> > toConsider; Mutex::Lock _l(_lock);
/* Pick up to ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE peers that meet for(std::map< uint64_t,std::pair< uint64_t,std::set<Address> > >::iterator g(_got.begin());g!=_got.end();) {
* our minimal criteria for this multicast group and place them if ((now - g->second.first) > ZT_MULTICAST_MAGNET_STATE_EXPIRE)
* into a set that is sorted in descending order of time of most _got.erase(g++);
* recent unicast frame transfer (implicit social ordering). */ else ++g;
{ }
Mutex::Lock _l(_multicastMemberships_m);
std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg))); for(std::map< Address,_PeerInfo >::iterator pi(_peers.begin());pi!=_peers.end();) {
if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) { for(std::map< MulticastGroup,uint64_t >::iterator g(pi->second.groups.begin());g!=pi->second.groups.end();) {
for(unsigned int stries=0,stmax=(max*10);stries<stmax;++stries) { if ((now - g->second) > ZT_MULTICAST_LIKE_EXPIRE)
MulticastMembership &m = mm->second[prng.next32() % mm->second.size()]; pi->second.groups.erase(g++);
if ( else ++g;
((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */ }
(!bf.contains(m.first.sum()))&& /* Not in propagation bloom */ if (pi->second.groups.empty()) {
(m.first != originalSubmitter)&& /* Not the original submitter */ _proximity.erase(pi->second.proximitySlot);
(m.first != upstream) ) { /* Not where the frame came from */ _peers.erase(pi++);
P peer(topology.getPeer(m.first)); } else ++pi;
if (peer) }
toConsider.insert(peer); /* Consider propagating to this peer */ }
}
} /**
* Pick next hops for a multicast by proximity
*
* @param mg Multicast group
* @param mcGuid Multicast message GUID (signer and signer unique ID)
* @param nextHopFunc Function to call for each address, search stops if it returns false
*/
template<typename F>
inline void getNextHops(const MulticastGroup &mg,uint64_t mcGuid,F nextHopFunc)
{
Mutex::Lock _l(_lock);
std::map< uint64_t,std::pair< uint64_t,std::set< Address > > > g(_got.find(mcGuid));
for(std::list< Address >::iterator a(_proximity.begin());a!=_proximity.end();++a) {
if (((g == _got.end())||(!g->second.second.count(*a)))&&(_peers.find(*a)->second.groups.count(mg))) {
if (!nextHopFunc(*a))
break;
} }
} }
/* The first peers in toConsider will be the "best" */
unsigned int chosen = 0;
for(typename std::set< P,_PeerPropagationPrioritySortOrder<P> >::iterator i(toConsider.begin());((i!=toConsider.end())&&(chosen < max));++i)
bf.set((peers[chosen++] = *i)->address().sum());
/* Tack on a supernode if we have no next hops */
if (!chosen) {
Address exclude[1];
exclude[0] = originalSubmitter; // if it came from a supernode, don't boomerang
P peer = topology.getBestSupernode(exclude,1,true);
if (peer)
peers[chosen++] = peer;
}
return chosen;
} }
private: private:
// Sort order for chosen propagation peers // GOTs by multicast GUID: time of last GOT, addresses that GOT
template<typename P> std::map< uint64_t,std::pair< uint64_t,std::set< Address > > > _got;
struct _PeerPropagationPrioritySortOrder
// Peer proximity ordering
std::list< Address > _proximity;
struct _PeerInfo
{ {
inline bool operator()(const P &p1,const P &p2) const // Groups and time of last LIKE for each group
{ std::map< MulticastGroup,uint64_t > groups;
return (p1->lastUnicastFrame() > p2->lastUnicastFrame());
} // Peer's slot in _proximity
std::list< Address >::iterator proximitySlot;
}; };
// ring buffer: [0] - CRC, [1] - timestamp // Time of last LIKE for each address's group subscriptions
uint64_t _multicastHistory[ZT_MULTICAST_DEDUP_HISTORY_LENGTH][2]; std::map< Address,_PeerInfo > _peers;
volatile unsigned int _multicastHistoryPtr;
// A multicast channel, essentially a pub/sub channel. It consists of a Mutex _lock;
// network ID and a multicast group within that network.
typedef std::pair<uint64_t,MulticastGroup> MulticastChannel;
// A membership in a multicast channel, an address and time of last LIKE
typedef std::pair<Address,uint64_t> MulticastMembership;
// Network : MulticastGroup -> vector<Address : time of last LIKE>
std::map< MulticastChannel,std::vector<MulticastMembership> > _multicastMemberships;
Mutex _multicastMemberships_m;
}; };
} // namespace ZeroTier } // namespace ZeroTier

View file

@ -40,8 +40,9 @@ const char *Packet::verbString(Verb v)
case VERB_WHOIS: return "WHOIS"; case VERB_WHOIS: return "WHOIS";
case VERB_RENDEZVOUS: return "RENDEZVOUS"; case VERB_RENDEZVOUS: return "RENDEZVOUS";
case VERB_FRAME: return "FRAME"; case VERB_FRAME: return "FRAME";
case VERB_MULTICAST_FRAME: return "MULTICAST_FRAME";
case VERB_MULTICAST_LIKE: return "MULTICAST_LIKE"; case VERB_MULTICAST_LIKE: return "MULTICAST_LIKE";
case VERB_MULTICAST_GOT: return "MULTICAST_GOT";
case VERB_MULTICAST_FRAME: return "MULTICAST_FRAME";
case VERB_NETWORK_MEMBERSHIP_CERTIFICATE: return "NETWORK_MEMBERSHIP_CERTIFICATE"; case VERB_NETWORK_MEMBERSHIP_CERTIFICATE: return "NETWORK_MEMBERSHIP_CERTIFICATE";
case VERB_NETWORK_CONFIG_REQUEST: return "NETWORK_CONFIG_REQUEST"; case VERB_NETWORK_CONFIG_REQUEST: return "NETWORK_CONFIG_REQUEST";
case VERB_NETWORK_CONFIG_REFRESH: return "NETWORK_CONFIG_REFRESH"; case VERB_NETWORK_CONFIG_REFRESH: return "NETWORK_CONFIG_REFRESH";

View file

@ -466,6 +466,7 @@ public:
VERB_MULTICAST_LIKE = 7, VERB_MULTICAST_LIKE = 7,
/* Announce receipt of a multicast to propagation magnet node: /* Announce receipt of a multicast to propagation magnet node:
* <[8] 64-bit network ID>
* <[8] 64-bit multicast GUID> * <[8] 64-bit multicast GUID>
* *
* OK/ERROR are not generated. * OK/ERROR are not generated.

View file

@ -42,7 +42,6 @@ class Demarc;
class Switch; class Switch;
class Topology; class Topology;
class SysEnv; class SysEnv;
class Multicaster;
class CMWC4096; class CMWC4096;
class Service; class Service;
class Node; class Node;
@ -66,7 +65,6 @@ public:
shutdownInProgress(false), shutdownInProgress(false),
log((Logger *)0), log((Logger *)0),
prng((CMWC4096 *)0), prng((CMWC4096 *)0),
multicaster((Multicaster *)0),
sw((Switch *)0), sw((Switch *)0),
demarc((Demarc *)0), demarc((Demarc *)0),
topology((Topology *)0), topology((Topology *)0),
@ -92,7 +90,6 @@ public:
Logger *log; // may be null Logger *log; // may be null
CMWC4096 *prng; CMWC4096 *prng;
Multicaster *multicaster;
Switch *sw; Switch *sw;
Demarc *demarc; Demarc *demarc;
Topology *topology; Topology *topology;