Add support for designated multicast replicators to Multicaster::send().

This commit is contained in:
Adam Ierymenko 2018-01-26 21:19:51 -05:00
parent f03fd57997
commit d3d58ba8a7
4 changed files with 58 additions and 4 deletions

View file

@ -161,15 +161,57 @@ void Multicaster::send(
void *tPtr, void *tPtr,
int64_t now, int64_t now,
const SharedPtr<Network> &network, const SharedPtr<Network> &network,
const Address &origin,
const MulticastGroup &mg, const MulticastGroup &mg,
const MAC &src, const MAC &src,
unsigned int etherType, unsigned int etherType,
const void *data, const void *data,
unsigned int len) unsigned int len)
{ {
unsigned long idxbuf[8194]; unsigned long idxbuf[4096];
unsigned long *indexes = idxbuf; unsigned long *indexes = idxbuf;
// If we're in hub-and-spoke designated multicast replication mode, see if we
// have a multicast replicator active. If so, pick the best and send it
// there. If we are a multicast replicator or if none are alive, fall back
// to sender replication.
{
Address multicastReplicators[ZT_MAX_NETWORK_SPECIALISTS];
const unsigned int multicastReplicatorCount = network->config().multicastReplicators(multicastReplicators);
if (multicastReplicatorCount) {
if (std::find(multicastReplicators,multicastReplicators + multicastReplicatorCount,RR->identity.address()) == (multicastReplicators + multicastReplicatorCount)) {
SharedPtr<Peer> bestMulticastReplicator;
SharedPtr<Path> bestMulticastReplicatorPath;
unsigned int bestMulticastReplicatorLatency = 0xffff;
for(unsigned int i=0;i<multicastReplicatorCount;++i) {
const SharedPtr<Peer> p(RR->topology->getPeerNoCache(multicastReplicators[i]));
if ((p)&&(p->isAlive(now))) {
const SharedPtr<Path> pp(p->getBestPath(now,false));
if ((pp)&&(pp->latency() < bestMulticastReplicatorLatency)) {
bestMulticastReplicatorLatency = pp->latency();
bestMulticastReplicatorPath = pp;
bestMulticastReplicator = p;
}
}
}
if (bestMulticastReplicator) {
Packet outp(bestMulticastReplicator->address(),RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
outp.append((uint64_t)network->id());
outp.append((uint8_t)0x04); // includes source MAC
((src) ? src : MAC(RR->identity.address(),network->id())).appendTo(outp);
mg.mac().appendTo(outp);
outp.append((uint32_t)mg.adi());
outp.append((uint16_t)etherType);
outp.append(data,len);
if (!network->config().disableCompression()) outp.compress();
outp.armor(bestMulticastReplicator->key(),true);
bestMulticastReplicatorPath->send(RR,tPtr,outp.data(),outp.size(),now);
return;
}
}
}
}
try { try {
Mutex::Lock _l(_groups_m); Mutex::Lock _l(_groups_m);
MulticastGroupStatus &gs = _groups[Multicaster::Key(network->id(),mg)]; MulticastGroupStatus &gs = _groups[Multicaster::Key(network->id(),mg)];
@ -214,7 +256,7 @@ void Multicaster::send(
unsigned int count = 0; unsigned int count = 0;
for(unsigned int i=0;i<activeBridgeCount;++i) { for(unsigned int i=0;i<activeBridgeCount;++i) {
if (activeBridges[i] != RR->identity.address()) { if ((activeBridges[i] != RR->identity.address())&&(activeBridges[i] != origin)) {
out.sendOnly(RR,tPtr,activeBridges[i]); // optimization: don't use dedup log if it's a one-pass send out.sendOnly(RR,tPtr,activeBridges[i]); // optimization: don't use dedup log if it's a one-pass send
if (++count >= limit) if (++count >= limit)
break; break;
@ -224,7 +266,7 @@ void Multicaster::send(
unsigned long idx = 0; unsigned long idx = 0;
while ((count < limit)&&(idx < gs.members.size())) { while ((count < limit)&&(idx < gs.members.size())) {
const Address ma(gs.members[indexes[idx++]].address); const Address ma(gs.members[indexes[idx++]].address);
if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) { if ((std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount))&&(ma != origin)) {
out.sendOnly(RR,tPtr,ma); // optimization: don't use dedup log if it's a one-pass send out.sendOnly(RR,tPtr,ma); // optimization: don't use dedup log if it's a one-pass send
++count; ++count;
} }

View file

@ -132,6 +132,7 @@ public:
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param now Current time * @param now Current time
* @param network Network * @param network Network
* @param origin Origin of multicast (to not return to sender) or NULL if none
* @param mg Multicast group * @param mg Multicast group
* @param src Source Ethernet MAC address or NULL to skip in packet and compute from ZT address (non-bridged mode) * @param src Source Ethernet MAC address or NULL to skip in packet and compute from ZT address (non-bridged mode)
* @param etherType Ethernet frame type * @param etherType Ethernet frame type
@ -142,6 +143,7 @@ public:
void *tPtr, void *tPtr,
int64_t now, int64_t now,
const SharedPtr<Network> &network, const SharedPtr<Network> &network,
const Address &origin,
const MulticastGroup &mg, const MulticastGroup &mg,
const MAC &src, const MAC &src,
unsigned int etherType, unsigned int etherType,

View file

@ -322,6 +322,16 @@ public:
return r; return r;
} }
inline unsigned int multicastReplicators(Address mr[ZT_MAX_NETWORK_SPECIALISTS]) const
{
unsigned int c = 0;
for(unsigned int i=0;i<specialistCount;++i) {
if ((specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR) != 0)
mr[c++] = specialists[i];
}
return c;
}
inline std::vector<Address> alwaysContactAddresses() const inline std::vector<Address> alwaysContactAddresses() const
{ {
std::vector<Address> r; std::vector<Address> r;

View file

@ -391,7 +391,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
tPtr, tPtr,
RR->node->now(), RR->node->now(),
network, network,
// network->config().activeBridges(), Address(),
multicastGroup, multicastGroup,
(fromBridged) ? from : MAC(), (fromBridged) ? from : MAC(),
etherType, etherType,