Plumb through attaching network ID to packet sends.

This commit is contained in:
Adam Ierymenko 2025-07-16 11:55:00 -04:00
parent 58c80ff0ab
commit 055be92ef0
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
11 changed files with 76 additions and 52 deletions

View file

@ -13,6 +13,7 @@
#include "Bond.hpp" #include "Bond.hpp"
#include "Constants.hpp"
#include "Node.hpp" #include "Node.hpp"
#include "Switch.hpp" #include "Switch.hpp"
@ -899,7 +900,7 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size()); RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
} }
else { else {
RR->sw->send(tPtr, outp, false); RR->sw->send(tPtr, outp, false, 0, ZT_QOS_NO_FLOW);
} }
Metrics::pkt_qos_out++; Metrics::pkt_qos_out++;
_paths[pathIdx].packetsReceivedSinceLastQoS = 0; _paths[pathIdx].packetsReceivedSinceLastQoS = 0;

View file

@ -13,6 +13,7 @@
#include "Membership.hpp" #include "Membership.hpp"
#include "Constants.hpp"
#include "Node.hpp" #include "Node.hpp"
#include "Packet.hpp" #include "Packet.hpp"
#include "Peer.hpp" #include "Peer.hpp"
@ -93,7 +94,7 @@ void Membership::pushCredentials(const RuntimeEnvironment* RR, void* tPtr, const
outp.setAt(cooCountAt, (uint16_t)thisPacketCooCount); outp.setAt(cooCountAt, (uint16_t)thisPacketCooCount);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, nconf.networkId, ZT_QOS_NO_FLOW);
Metrics::pkt_network_credentials_out++; Metrics::pkt_network_credentials_out++;
} }

View file

@ -272,7 +272,7 @@ void Multicaster::send(void* tPtr, int64_t now, const SharedPtr<Network>& networ
Address explicitGatherPeers[16]; Address explicitGatherPeers[16];
unsigned int numExplicitGatherPeers = 0; unsigned int numExplicitGatherPeers = 0;
SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer()); SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer(network->id()));
if (bestRoot) { if (bestRoot) {
explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address(); explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address();
} }
@ -312,7 +312,7 @@ void Multicaster::send(void* tPtr, int64_t now, const SharedPtr<Network>& networ
com->serialize(outp); com->serialize(outp);
} }
RR->node->expectReplyTo(outp.packetId()); RR->node->expectReplyTo(outp.packetId());
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, network->id(), ZT_QOS_NO_FLOW);
Metrics::pkt_multicast_gather_out++; Metrics::pkt_multicast_gather_out++;
} }
} }

View file

@ -31,7 +31,6 @@
#include "Trace.hpp" #include "Trace.hpp"
#include <math.h> #include <math.h>
#include <set>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -808,7 +807,7 @@ bool Network::filterOutgoingPacket(
outp.append((uint16_t)etherType); outp.append((uint16_t)etherType);
outp.append(frameData, ccLength2); outp.append(frameData, ccLength2);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
break; break;
@ -846,7 +845,7 @@ bool Network::filterOutgoingPacket(
outp.append((uint16_t)etherType); outp.append((uint16_t)etherType);
outp.append(frameData, ccLength); outp.append(frameData, ccLength);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
if ((ztDest != ztFinalDest) && (ztFinalDest)) { if ((ztDest != ztFinalDest) && (ztFinalDest)) {
@ -858,7 +857,7 @@ bool Network::filterOutgoingPacket(
outp.append((uint16_t)etherType); outp.append((uint16_t)etherType);
outp.append(frameData, frameLen); outp.append(frameData, frameLen);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
if (_config.remoteTraceTarget) { if (_config.remoteTraceTarget) {
RR->t->networkFilter( RR->t->networkFilter(
@ -985,7 +984,7 @@ int Network::filterIncomingPacket(
outp.append((uint16_t)etherType); outp.append((uint16_t)etherType);
outp.append(frameData, ccLength2); outp.append(frameData, ccLength2);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
break; break;
} }
@ -1018,7 +1017,7 @@ int Network::filterIncomingPacket(
outp.append((uint16_t)etherType); outp.append((uint16_t)etherType);
outp.append(frameData, ccLength); outp.append(frameData, ccLength);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
if ((ztDest != ztFinalDest) && (ztFinalDest)) { if ((ztDest != ztFinalDest) && (ztFinalDest)) {
@ -1030,7 +1029,7 @@ int Network::filterIncomingPacket(
outp.append((uint16_t)etherType); outp.append((uint16_t)etherType);
outp.append(frameData, frameLen); outp.append(frameData, frameLen);
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
if (_config.remoteTraceTarget) { if (_config.remoteTraceTarget) {
RR->t->networkFilter(tPtr, *this, rrl, (c) ? &crrl : (Trace::RuleResultLog*)0, c, sourcePeer->address(), ztDest, macSource, macDest, frameData, frameLen, etherType, vlanId, false, true, 0); RR->t->networkFilter(tPtr, *this, rrl, (c) ? &crrl : (Trace::RuleResultLog*)0, c, sourcePeer->address(), ztDest, macSource, macDest, frameData, frameLen, etherType, vlanId, false, true, 0);
@ -1160,7 +1159,7 @@ uint64_t Network::handleConfigChunk(void* tPtr, const uint64_t packetId, const A
if ((*a != source) && (*a != controller())) { if ((*a != source) && (*a != controller())) {
Packet outp(*a, RR->identity.address(), Packet::VERB_NETWORK_CONFIG); Packet outp(*a, RR->identity.address(), Packet::VERB_NETWORK_CONFIG);
outp.append(reinterpret_cast<const uint8_t*>(chunk.data()) + start, chunk.size() - start); outp.append(reinterpret_cast<const uint8_t*>(chunk.data()) + start, chunk.size() - start);
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
} }
} }
@ -1471,7 +1470,7 @@ void Network::requestConfiguration(void* tPtr)
} }
outp.compress(); outp.compress();
RR->node->expectReplyTo(outp.packetId()); RR->node->expectReplyTo(outp.packetId());
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
bool Network::gate(void* tPtr, const SharedPtr<Peer>& peer) bool Network::gate(void* tPtr, const SharedPtr<Peer>& peer)
@ -1628,7 +1627,7 @@ Membership::AddCredentialResult Network::addCredential(void* tPtr, const Address
outp.append((uint16_t)1); // one revocation! outp.append((uint16_t)1); // one revocation!
rev.serialize(outp); rev.serialize(outp);
outp.append((uint16_t)0); // no certificates of ownership outp.append((uint16_t)0); // no certificates of ownership
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, _id, ZT_QOS_NO_FLOW);
} }
} }
} }
@ -1798,7 +1797,7 @@ void Network::_announceMulticastGroupsTo(void* tPtr, const Address& peer, const
for (std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin()); mg != allMulticastGroups.end(); ++mg) { for (std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin()); mg != allMulticastGroups.end(); ++mg) {
if ((outp->size() + 24) >= ZT_PROTO_MAX_PACKET_LENGTH) { if ((outp->size() + 24) >= ZT_PROTO_MAX_PACKET_LENGTH) {
outp->compress(); outp->compress();
RR->sw->send(tPtr, *outp, true); RR->sw->send(tPtr, *outp, true, _id, ZT_QOS_NO_FLOW);
outp->reset(peer, RR->identity.address(), Packet::VERB_MULTICAST_LIKE); outp->reset(peer, RR->identity.address(), Packet::VERB_MULTICAST_LIKE);
} }
@ -1810,7 +1809,7 @@ void Network::_announceMulticastGroupsTo(void* tPtr, const Address& peer, const
if (outp->size() > ZT_PROTO_MIN_PACKET_LENGTH) { if (outp->size() > ZT_PROTO_MIN_PACKET_LENGTH) {
outp->compress(); outp->compress();
RR->sw->send(tPtr, *outp, true); RR->sw->send(tPtr, *outp, true, _id, ZT_QOS_NO_FLOW);
} }
delete outp; delete outp;

View file

@ -377,6 +377,28 @@ class NetworkConfig {
return false; return false;
} }
inline std::vector<Address> alwaysContactAddresses() const
{
std::vector<Address> r;
for (unsigned int i = 0; i < specialistCount; ++i) {
if ((specialists[i] & (ZT_NETWORKCONFIG_SPECIALIST_TYPE_NETWORK_RELAY | ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR)) != 0) {
r.push_back(Address(specialists[i]));
}
}
return r;
}
inline unsigned int alwaysContactAddresses(Address ac[ZT_MAX_NETWORK_SPECIALISTS]) const
{
unsigned int c = 0;
for (unsigned int i = 0; i < specialistCount; ++i) {
if ((specialists[i] & (ZT_NETWORKCONFIG_SPECIALIST_TYPE_NETWORK_RELAY | ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR)) != 0) {
ac[c++] = specialists[i];
}
}
return c;
}
inline void alwaysContactAddresses(Hashtable<Address, std::vector<InetAddress> >& a) const inline void alwaysContactAddresses(Hashtable<Address, std::vector<InetAddress> >& a) const
{ {
for (unsigned int i = 0; i < specialistCount; ++i) { for (unsigned int i = 0; i < specialistCount; ++i) {

View file

@ -699,7 +699,7 @@ int Node::sendUserMessage(void* tptr, uint64_t dest, uint64_t typeId, const void
outp.append(typeId); outp.append(typeId);
outp.append(data, len); outp.append(data, len);
outp.compress(); outp.compress();
RR->sw->send(tptr, outp, true); RR->sw->send(tptr, outp, true, 0, ZT_QOS_NO_FLOW);
return 1; return 1;
} }
} }
@ -825,7 +825,7 @@ void Node::ncSendConfig(uint64_t nwid, uint64_t requestPacketId, const Address&
outp.append(sig.data, ZT_ECC_SIGNATURE_LEN); outp.append(sig.data, ZT_ECC_SIGNATURE_LEN);
outp.compress(); outp.compress();
RR->sw->send((void*)0, outp, true); RR->sw->send((void*)0, outp, true, nwid, ZT_QOS_NO_FLOW);
chunkIndex += chunkLen; chunkIndex += chunkLen;
} }
} }
@ -855,7 +855,7 @@ void Node::ncSendRevocation(const Address& destination, const Revocation& rev)
outp.append((uint16_t)1); outp.append((uint16_t)1);
rev.serialize(outp); rev.serialize(outp);
outp.append((uint16_t)0); outp.append((uint16_t)0);
RR->sw->send((void*)0, outp, true); RR->sw->send((void*)0, outp, true, rev.networkId(), ZT_QOS_NO_FLOW);
} }
} }
@ -911,7 +911,7 @@ void Node::ncSendError(uint64_t nwid, uint64_t requestPacketId, const Address& d
outp.append(errorData, errorDataSize); outp.append(errorData, errorDataSize);
} }
RR->sw->send((void*)0, outp, true); RR->sw->send((void*)0, outp, true, nwid, ZT_QOS_NO_FLOW);
} // else we can't send an ERROR() in response to nothing, so discard } // else we can't send an ERROR() in response to nothing, so discard
} }

View file

@ -16,7 +16,6 @@
#include "Constants.hpp" #include "Constants.hpp"
#include "Network.hpp" #include "Network.hpp"
#include "Node.hpp" #include "Node.hpp"
#include "Peer.hpp"
#include "RuntimeEnvironment.hpp" #include "RuntimeEnvironment.hpp"
#include "Switch.hpp" #include "Switch.hpp"
#include "Topology.hpp" #include "Topology.hpp"
@ -87,7 +86,7 @@ void OutboundMulticast::sendOnly(const RuntimeEnvironment* RR, void* tPtr, const
_packet.setDestination(toAddr); _packet.setDestination(toAddr);
RR->node->expectReplyTo(_packet.packetId()); RR->node->expectReplyTo(_packet.packetId());
_tmp = _packet; _tmp = _packet;
RR->sw->send(tPtr, _tmp, true); RR->sw->send(tPtr, _tmp, true, _nwid, ZT_QOS_NO_FLOW);
} }
} }

View file

@ -470,7 +470,7 @@ void Peer::sendHELLO(void* tPtr, const int64_t localSocket, const InetAddress& a
} }
else { else {
RR->node->expectReplyTo(outp.packetId()); RR->node->expectReplyTo(outp.packetId());
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, 0, ZT_QOS_NO_FLOW);
} }
} }

View file

@ -83,6 +83,7 @@ void Switch::onRemotePacket(void* tPtr, const int64_t localSocket, const InetAdd
const Address destination(fragment.destination()); const Address destination(fragment.destination());
if (destination != RR->identity.address()) { if (destination != RR->identity.address()) {
// Fragment is someone else's.
if ((! RR->topology->amUpstream()) && (! path->trustEstablished(now))) { if ((! RR->topology->amUpstream()) && (! path->trustEstablished(now))) {
return; return;
} }
@ -95,7 +96,7 @@ void Switch::onRemotePacket(void* tPtr, const int64_t localSocket, const InetAdd
SharedPtr<Peer> relayTo = RR->topology->getPeer(tPtr, destination); SharedPtr<Peer> relayTo = RR->topology->getPeer(tPtr, destination);
if ((! relayTo) || (! relayTo->sendDirect(tPtr, fragment.data(), fragment.size(), now, false))) { if ((! relayTo) || (! relayTo->sendDirect(tPtr, fragment.data(), fragment.size(), now, false))) {
// Don't know peer or no direct path -- so relay via someone upstream // Don't know peer or no direct path -- so relay via someone upstream
relayTo = RR->topology->getUpstreamPeer(); relayTo = RR->topology->getUpstreamPeer(0);
if (relayTo) { if (relayTo) {
relayTo->sendDirect(tPtr, fragment.data(), fragment.size(), now, true); relayTo->sendDirect(tPtr, fragment.data(), fragment.size(), now, true);
} }
@ -164,6 +165,7 @@ void Switch::onRemotePacket(void* tPtr, const int64_t localSocket, const InetAdd
} }
if (destination != RR->identity.address()) { if (destination != RR->identity.address()) {
// Not our packet head.
if ((! RR->topology->amUpstream()) && (! path->trustEstablished(now)) && (source != RR->identity.address())) { if ((! RR->topology->amUpstream()) && (! path->trustEstablished(now)) && (source != RR->identity.address())) {
return; return;
} }
@ -182,7 +184,7 @@ void Switch::onRemotePacket(void* tPtr, const int64_t localSocket, const InetAdd
} }
} }
else { else {
relayTo = RR->topology->getUpstreamPeer(); relayTo = RR->topology->getUpstreamPeer(0);
if ((relayTo) && (relayTo->address() != source)) { if ((relayTo) && (relayTo->address() != source)) {
if (relayTo->sendDirect(tPtr, packet.data(), packet.size(), now, true)) { if (relayTo->sendDirect(tPtr, packet.data(), packet.size(), now, true)) {
const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(tPtr, source)); const SharedPtr<Peer> sourcePeer(RR->topology->getPeer(tPtr, source));
@ -550,7 +552,7 @@ void Switch::onLocalEthernet(void* tPtr, const SharedPtr<Network>& network, cons
// 1.4.8: disable compression for unicast as it almost never helps // 1.4.8: disable compression for unicast as it almost never helps
// if (!network->config().disableCompression()) // if (!network->config().disableCompression())
// outp.compress(); // outp.compress();
aqm_enqueue(tPtr, network, outp, true, qosBucket, flowId); aqm_enqueue(tPtr, network, outp, true, qosBucket, network->id(), flowId);
} }
else { else {
Packet outp(toZT, RR->identity.address(), Packet::VERB_EXT_FRAME); Packet outp(toZT, RR->identity.address(), Packet::VERB_EXT_FRAME);
@ -563,7 +565,7 @@ void Switch::onLocalEthernet(void* tPtr, const SharedPtr<Network>& network, cons
// 1.4.8: disable compression for unicast as it almost never helps // 1.4.8: disable compression for unicast as it almost never helps
// if (!network->config().disableCompression()) // if (!network->config().disableCompression())
// outp.compress(); // outp.compress();
aqm_enqueue(tPtr, network, outp, true, qosBucket, flowId); aqm_enqueue(tPtr, network, outp, true, qosBucket, network->id(), flowId);
} }
} }
else { else {
@ -627,7 +629,7 @@ void Switch::onLocalEthernet(void* tPtr, const SharedPtr<Network>& network, cons
// 1.4.8: disable compression for unicast as it almost never helps // 1.4.8: disable compression for unicast as it almost never helps
// if (!network->config().disableCompression()) // if (!network->config().disableCompression())
// outp.compress(); // outp.compress();
aqm_enqueue(tPtr, network, outp, true, qosBucket, flowId); aqm_enqueue(tPtr, network, outp, true, qosBucket, network->id(), flowId);
} }
else { else {
RR->t->outgoingNetworkFrameDropped(tPtr, network, from, to, etherType, vlanId, len, "filter blocked (bridge replication)"); RR->t->outgoingNetworkFrameDropped(tPtr, network, from, to, etherType, vlanId, len, "filter blocked (bridge replication)");
@ -636,10 +638,10 @@ void Switch::onLocalEthernet(void* tPtr, const SharedPtr<Network>& network, cons
} }
} }
void Switch::aqm_enqueue(void* tPtr, const SharedPtr<Network>& network, Packet& packet, bool encrypt, int qosBucket, int32_t flowId) void Switch::aqm_enqueue(void* tPtr, const SharedPtr<Network>& network, Packet& packet, const bool encrypt, const int qosBucket, const uint64_t nwid, const int32_t flowId)
{ {
if (! network->qosEnabled()) { if (! network->qosEnabled()) {
send(tPtr, packet, encrypt, flowId); send(tPtr, packet, encrypt, nwid, flowId);
return; return;
} }
NetworkQoSControlBlock* nqcb = _netQueueControlBlock[network->id()]; NetworkQoSControlBlock* nqcb = _netQueueControlBlock[network->id()];
@ -654,7 +656,7 @@ void Switch::aqm_enqueue(void* tPtr, const SharedPtr<Network>& network, Packet&
} }
// Don't apply QoS scheduling to ZT protocol traffic // Don't apply QoS scheduling to ZT protocol traffic
if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) { if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
send(tPtr, packet, encrypt, flowId); send(tPtr, packet, encrypt, nwid, flowId);
} }
_aqm_m.lock(); _aqm_m.lock();
@ -662,7 +664,7 @@ void Switch::aqm_enqueue(void* tPtr, const SharedPtr<Network>& network, Packet&
// Enqueue packet and move queue to appropriate list // Enqueue packet and move queue to appropriate list
const Address dest(packet.destination()); const Address dest(packet.destination());
TXQueueEntry* txEntry = new TXQueueEntry(dest, RR->node->now(), packet, encrypt, flowId); TXQueueEntry* txEntry = new TXQueueEntry(dest, nwid, RR->node->now(), packet, encrypt, flowId);
ManagedQueue* selectedQueue = nullptr; ManagedQueue* selectedQueue = nullptr;
for (size_t i = 0; i < ZT_AQM_NUM_BUCKETS; i++) { for (size_t i = 0; i < ZT_AQM_NUM_BUCKETS; i++) {
@ -842,7 +844,7 @@ void Switch::aqm_dequeue(void* tPtr)
queueAtFrontOfList->byteCredit -= len; queueAtFrontOfList->byteCredit -= len;
// Send the packet! // Send the packet!
queueAtFrontOfList->q.pop_front(); queueAtFrontOfList->q.pop_front();
send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->flowId); send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->nwid, entryToEmit->flowId);
(*nqcb).second->_currEnqueuedPackets--; (*nqcb).second->_currEnqueuedPackets--;
} }
if (queueAtFrontOfList) { if (queueAtFrontOfList) {
@ -875,7 +877,7 @@ void Switch::aqm_dequeue(void* tPtr)
queueAtFrontOfList->byteLength -= len; queueAtFrontOfList->byteLength -= len;
queueAtFrontOfList->byteCredit -= len; queueAtFrontOfList->byteCredit -= len;
queueAtFrontOfList->q.pop_front(); queueAtFrontOfList->q.pop_front();
send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->flowId); send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->nwid, entryToEmit->flowId);
(*nqcb).second->_currEnqueuedPackets--; (*nqcb).second->_currEnqueuedPackets--;
} }
if (queueAtFrontOfList) { if (queueAtFrontOfList) {
@ -899,20 +901,20 @@ void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
} }
} }
void Switch::send(void* tPtr, Packet& packet, bool encrypt, int32_t flowId) void Switch::send(void* tPtr, Packet& packet, const bool encrypt, const uint64_t nwid, const int32_t flowId)
{ {
const Address dest(packet.destination()); const Address dest(packet.destination());
if (dest == RR->identity.address()) { if (dest == RR->identity.address()) {
return; return;
} }
_recordOutgoingPacketMetrics(packet); _recordOutgoingPacketMetrics(packet);
if (! _trySend(tPtr, packet, encrypt, flowId)) { if (! _trySend(tPtr, packet, encrypt, nwid, flowId)) {
{ {
Mutex::Lock _l(_txQueue_m); Mutex::Lock _l(_txQueue_m);
if (_txQueue.size() >= ZT_TX_QUEUE_SIZE) { if (_txQueue.size() >= ZT_TX_QUEUE_SIZE) {
_txQueue.pop_front(); _txQueue.pop_front();
} }
_txQueue.push_back(TXQueueEntry(dest, RR->node->now(), packet, encrypt, flowId)); _txQueue.push_back(TXQueueEntry(dest, nwid, RR->node->now(), packet, encrypt, flowId));
} }
if (! RR->topology->getPeer(tPtr, dest)) { if (! RR->topology->getPeer(tPtr, dest)) {
requestWhois(tPtr, RR->node->now(), dest); requestWhois(tPtr, RR->node->now(), dest);
@ -937,12 +939,12 @@ void Switch::requestWhois(void* tPtr, const int64_t now, const Address& addr)
} }
} }
const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer()); const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(0));
if (upstream) { if (upstream) {
int32_t flowId = ZT_QOS_NO_FLOW; int32_t flowId = ZT_QOS_NO_FLOW;
Packet outp(upstream->address(), RR->identity.address(), Packet::VERB_WHOIS); Packet outp(upstream->address(), RR->identity.address(), Packet::VERB_WHOIS);
addr.appendTo(outp); addr.appendTo(outp);
send(tPtr, outp, true, flowId); send(tPtr, outp, true, 0, flowId);
} }
} }
@ -968,7 +970,7 @@ void Switch::doAnythingWaitingForPeer(void* tPtr, const SharedPtr<Peer>& peer)
Mutex::Lock _l(_txQueue_m); Mutex::Lock _l(_txQueue_m);
for (std::list<TXQueueEntry>::iterator txi(_txQueue.begin()); txi != _txQueue.end();) { for (std::list<TXQueueEntry>::iterator txi(_txQueue.begin()); txi != _txQueue.end();) {
if (txi->dest == peer->address()) { if (txi->dest == peer->address()) {
if (_trySend(tPtr, txi->packet, txi->encrypt, txi->flowId)) { if (_trySend(tPtr, txi->packet, txi->encrypt, 0, txi->flowId)) {
_txQueue.erase(txi++); _txQueue.erase(txi++);
} }
else { else {
@ -995,7 +997,7 @@ unsigned long Switch::doTimerTasks(void* tPtr, int64_t now)
Mutex::Lock _l(_txQueue_m); Mutex::Lock _l(_txQueue_m);
for (std::list<TXQueueEntry>::iterator txi(_txQueue.begin()); txi != _txQueue.end();) { for (std::list<TXQueueEntry>::iterator txi(_txQueue.begin()); txi != _txQueue.end();) {
if (_trySend(tPtr, txi->packet, txi->encrypt, txi->flowId)) { if (_trySend(tPtr, txi->packet, txi->encrypt, 0, txi->flowId)) {
_txQueue.erase(txi++); _txQueue.erase(txi++);
} }
else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) { else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
@ -1067,7 +1069,7 @@ bool Switch::_shouldUnite(const int64_t now, const Address& source, const Addres
return false; return false;
} }
bool Switch::_trySend(void* tPtr, Packet& packet, bool encrypt, int32_t flowId) bool Switch::_trySend(void* tPtr, Packet& packet, bool encrypt, const uint64_t nwid, const int32_t flowId)
{ {
SharedPtr<Path> viaPath; SharedPtr<Path> viaPath;
const int64_t now = RR->node->now(); const int64_t now = RR->node->now();
@ -1076,7 +1078,7 @@ bool Switch::_trySend(void* tPtr, Packet& packet, bool encrypt, int32_t flowId)
const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr, destination)); const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr, destination));
if (peer) { if (peer) {
if ((peer->bondingPolicy() == ZT_BOND_POLICY_BROADCAST) && (packet.verb() == Packet::VERB_FRAME || packet.verb() == Packet::VERB_EXT_FRAME)) { if ((peer->bondingPolicy() == ZT_BOND_POLICY_BROADCAST) && (packet.verb() == Packet::VERB_FRAME || packet.verb() == Packet::VERB_EXT_FRAME)) {
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer()); const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer(nwid));
Mutex::Lock _l(peer->_paths_m); Mutex::Lock _l(peer->_paths_m);
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
if (peer->_paths[i].p && peer->_paths[i].p->alive(now)) { if (peer->_paths[i].p && peer->_paths[i].p->alive(now)) {
@ -1090,7 +1092,7 @@ bool Switch::_trySend(void* tPtr, Packet& packet, bool encrypt, int32_t flowId)
viaPath = peer->getAppropriatePath(now, false, flowId); viaPath = peer->getAppropriatePath(now, false, flowId);
if (! viaPath) { if (! viaPath) {
peer->tryMemorizedPath(tPtr, now); // periodically attempt memorized or statically defined paths, if any are known peer->tryMemorizedPath(tPtr, now); // periodically attempt memorized or statically defined paths, if any are known
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer()); const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer(nwid));
if ((! relay) || (! (viaPath = relay->getAppropriatePath(now, false, flowId)))) { if ((! relay) || (! (viaPath = relay->getAppropriatePath(now, false, flowId)))) {
if (! (viaPath = peer->getAppropriatePath(now, true, flowId))) { if (! (viaPath = peer->getAppropriatePath(now, true, flowId))) {
return false; return false;

View file

@ -24,11 +24,9 @@
#include "Packet.hpp" #include "Packet.hpp"
#include "SharedPtr.hpp" #include "SharedPtr.hpp"
#include "Topology.hpp" #include "Topology.hpp"
#include "Utils.hpp"
#include <list> #include <list>
#include <map> #include <map>
#include <set>
#include <vector> #include <vector>
/* Ethernet frame types that might be relevant to us */ /* Ethernet frame types that might be relevant to us */
@ -124,7 +122,7 @@ class Switch {
* @param encrypt Encrypt packet payload? (always true except for HELLO) * @param encrypt Encrypt packet payload? (always true except for HELLO)
* @param qosBucket Which bucket the rule-system determined this packet should fall into * @param qosBucket Which bucket the rule-system determined this packet should fall into
*/ */
void aqm_enqueue(void* tPtr, const SharedPtr<Network>& network, Packet& packet, bool encrypt, int qosBucket, int32_t flowId = ZT_QOS_NO_FLOW); void aqm_enqueue(void* tPtr, const SharedPtr<Network>& network, Packet& packet, const bool encrypt, const int qosBucket, const uint64_t nwid, const int32_t flowId /* = ZT_QOS_NO_FLOW*/);
/** /**
* Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks
@ -169,8 +167,9 @@ class Switch {
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param packet Packet to send (buffer may be modified) * @param packet Packet to send (buffer may be modified)
* @param encrypt Encrypt packet payload? (always true except for HELLO) * @param encrypt Encrypt packet payload? (always true except for HELLO)
* @param nwid Network ID to which this packet is related or 0 if none
*/ */
void send(void* tPtr, Packet& packet, bool encrypt, int32_t flowId = ZT_QOS_NO_FLOW); void send(void* tPtr, Packet& packet, const bool encrypt, const uint64_t nwid, const int32_t flowId /* = ZT_QOS_NO_FLOW*/);
/** /**
* Request WHOIS on a given address * Request WHOIS on a given address
@ -205,7 +204,7 @@ class Switch {
private: private:
bool _shouldUnite(const int64_t now, const Address& source, const Address& destination); bool _shouldUnite(const int64_t now, const Address& source, const Address& destination);
bool _trySend(void* tPtr, Packet& packet, bool encrypt, int32_t flowId = ZT_QOS_NO_FLOW); // packet is modified if return is true bool _trySend(void* tPtr, Packet& packet, bool encrypt, const uint64_t nwid, const int32_t flowId /* = ZT_QOS_NO_FLOW*/);
void _sendViaSpecificPath(void* tPtr, SharedPtr<Peer> peer, SharedPtr<Path> viaPath, uint16_t userSpecifiedMtu, int64_t now, Packet& packet, bool encrypt, int32_t flowId); void _sendViaSpecificPath(void* tPtr, SharedPtr<Peer> peer, SharedPtr<Path> viaPath, uint16_t userSpecifiedMtu, int64_t now, Packet& packet, bool encrypt, int32_t flowId);
void _recordOutgoingPacketMetrics(const Packet& p); void _recordOutgoingPacketMetrics(const Packet& p);
@ -260,11 +259,12 @@ class Switch {
TXQueueEntry() TXQueueEntry()
{ {
} }
TXQueueEntry(Address d, uint64_t ct, const Packet& p, bool enc, int32_t fid) : dest(d), creationTime(ct), packet(p), encrypt(enc), flowId(fid) TXQueueEntry(Address d, uint64_t nwid, uint64_t ct, const Packet& p, bool enc, int32_t fid) : dest(d), nwid(nwid), creationTime(ct), packet(p), encrypt(enc), flowId(fid)
{ {
} }
Address dest; Address dest;
uint64_t nwid;
uint64_t creationTime; uint64_t creationTime;
Packet packet; // unencrypted/unMAC'd packet -- this is done at send time Packet packet; // unencrypted/unMAC'd packet -- this is done at send time
bool encrypt; bool encrypt;

View file

@ -19,13 +19,13 @@
#include "Capability.hpp" #include "Capability.hpp"
#include "CertificateOfMembership.hpp" #include "CertificateOfMembership.hpp"
#include "CertificateOfOwnership.hpp" #include "CertificateOfOwnership.hpp"
#include "Constants.hpp"
#include "Dictionary.hpp" #include "Dictionary.hpp"
#include "Node.hpp" #include "Node.hpp"
#include "Revocation.hpp" #include "Revocation.hpp"
#include "RuntimeEnvironment.hpp" #include "RuntimeEnvironment.hpp"
#include "Switch.hpp" #include "Switch.hpp"
#include "Tag.hpp" #include "Tag.hpp"
#include "Utils.hpp"
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h> #include <stdio.h>
@ -632,7 +632,7 @@ void Trace::_send(void* const tPtr, const Dictionary<ZT_MAX_REMOTE_TRACE_SIZE>&
Packet outp(dest, RR->identity.address(), Packet::VERB_REMOTE_TRACE); Packet outp(dest, RR->identity.address(), Packet::VERB_REMOTE_TRACE);
outp.appendCString(d.data()); outp.appendCString(d.data());
outp.compress(); outp.compress();
RR->sw->send(tPtr, outp, true); RR->sw->send(tPtr, outp, true, 0, ZT_QOS_NO_FLOW);
} }
void Trace::_spamToAllNetworks(void* const tPtr, const Dictionary<ZT_MAX_REMOTE_TRACE_SIZE>& d, const Level level) void Trace::_spamToAllNetworks(void* const tPtr, const Dictionary<ZT_MAX_REMOTE_TRACE_SIZE>& d, const Level level)