A few minor changes: (1) bind sockets to peers so the same socket is always used to send as most recently received, (2) pick the most recently active IP (v4 or v6) if both exist, and (3) introduce expiry of V4 or V6 IPs in addition to the peer overall.

This commit is contained in:
Adam Ierymenko 2021-05-26 19:37:48 -04:00
parent 32ca1a09da
commit 311f9c5c2a

View file

@ -126,14 +126,17 @@ using json = nlohmann::json;
*/
struct RootPeer
{
ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {}
ZT_ALWAYS_INLINE RootPeer() : v4s(-1),v6s(-1),lastSend(0),lastReceive(0),lastReceiveV4(0),lastReceiveV6(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {}
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
Identity id; // Identity
uint8_t key[32]; // Shared secret key
InetAddress ip4,ip6; // IPv4 and IPv6 addresses
int v4s, v6s; // IPv4 and IPv6 sockets
int64_t lastSend; // Time of last send (any packet)
int64_t lastReceive; // Time of last receive (any packet)
int64_t lastReceiveV4; // Time of last IPv4 receive
int64_t lastReceiveV6; // Time of last IPv6 receive
int64_t lastEcho; // Time of last received ECHO
int64_t lastHello; // Time of last received HELLO
int vProto; // Protocol version or -1 if unknown
@ -229,7 +232,7 @@ static ZT_ALWAYS_INLINE std::array< uint64_t,2 > ip6ToH128(const InetAddress &ip
return i128;
}
static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip,Packet &pkt)
static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
{
char ipstr[128],ipstr2[128],astr[32],astr2[32],tmpstr[256];
const bool fragment = pkt[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR;
@ -329,18 +332,27 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
// If we found the peer, update IP and/or time and handle certain key packet types that the
// root must concern itself with.
if (peer) {
if (ip->isV4())
peer->ip4 = ip;
else if (ip->isV6())
peer->ip6 = ip;
const int64_t now = OSUtils::now();
if (ip->isV4()) {
peer->ip4 = ip;
peer->v4s = sock;
peer->lastReceiveV4 = now;
if ((now - peer->lastReceiveV6) > ZT_PEER_ACTIVITY_TIMEOUT)
peer->v6s = -1;
} else if (ip->isV6()) {
peer->ip6 = ip;
peer->v6s = sock;
peer->lastReceiveV6 = now;
if ((now - peer->lastReceiveV4) > ZT_PEER_ACTIVITY_TIMEOUT)
peer->v4s = -1;
}
peer->lastReceive = now;
switch(pkt.verb()) {
case Packet::VERB_HELLO:
try {
if ((now - peer->lastHello) > 500) {
if ((now - peer->lastHello) > 250) {
peer->lastHello = now;
peer->vProto = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION];
@ -367,7 +379,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
}
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,pkt.size());
peer->lastSend = now;
@ -388,7 +400,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append(((const uint8_t *)pkt.data()) + ZT_PACKET_IDX_PAYLOAD,pkt.size() - ZT_PACKET_IDX_PAYLOAD);
outp.compress();
outp.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
sendto(sock,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,outp.size());
peer->lastSend = now;
@ -400,6 +412,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
case Packet::VERB_WHOIS:
try {
std::vector< SharedPtr<RootPeer> > results;
results.reserve(4);
{
std::lock_guard<std::mutex> l(s_peersByVirtAddr_l);
for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+ZT_ADDRESS_LENGTH)<=pkt.size();ptr+=ZT_ADDRESS_LENGTH) {
@ -419,7 +432,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
for(auto p=results.begin();p!=results.end();++p)
(*p)->id.serialize(pkt,false);
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,pkt.size());
peer->lastSend = now;
@ -479,7 +492,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if (l > 0) {
pkt.setAt<uint16_t>(countAt,(uint16_t)l);
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,pkt.size());
peer->lastSend = now;
@ -531,14 +544,21 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
std::vector< std::pair< InetAddress *,SharedPtr<RootPeer> > > toAddrs;
toAddrs.reserve(4);
{
std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
auto peers = s_peersByVirtAddr.find(dest);
if (peers != s_peersByVirtAddr.end()) {
for(auto p=peers->second.begin();p!=peers->second.end();++p) {
if ((*p)->ip4) {
if (((*p)->v4s >= 0)&&((*p)->v6s >= 0)) {
if ((*p)->lastReceiveV4 > (*p)->lastReceiveV6) {
toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
} else {
toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
}
} else if ((*p)->v4s >= 0) {
toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
} else if ((*p)->ip6) {
} else if ((*p)->v6s >= 0) {
toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
}
}
@ -555,7 +575,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if (sources != s_peersByVirtAddr.end()) {
for(auto a=sources->second.begin();a!=sources->second.end();++a) {
for(auto b=toAddrs.begin();b!=toAddrs.end();++b) {
if (((*a)->ip6)&&(b->second->ip6)) {
if (((*a)->v6s >= 0)&&(b->second->v6s >= 0)) {
//printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip6.toString(ipstr2),dest.toString(astr2));
// Introduce source to destination (V6)
@ -566,7 +586,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)16);
outp.append((const uint8_t *)(b->second->ip6.rawIpData()),16);
outp.armor((*a)->key,true);
sendto(v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6));
sendto((*a)->v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6));
s_outputRate.log(now,outp.size());
(*a)->lastSend = now;
@ -579,12 +599,12 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)16);
outp.append((const uint8_t *)((*a)->ip6.rawIpData()),16);
outp.armor(b->second->key,true);
sendto(v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6));
sendto(b->second->v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6));
s_outputRate.log(now,outp.size());
b->second->lastSend = now;
}
if (((*a)->ip4)&&(b->second->ip4)) {
if (((*a)->v4s >= 0)&&(b->second->v4s >= 0)) {
//printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip4.toString(ipstr2),dest.toString(astr2));
// Introduce source to destination (V4)
@ -595,7 +615,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)4);
outp.append((const uint8_t *)b->second->ip4.rawIpData(),4);
outp.armor((*a)->key,true);
sendto(v4s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in));
sendto((*a)->v4s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in));
s_outputRate.log(now,outp.size());
(*a)->lastSend = now;
@ -608,7 +628,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)4);
outp.append((const uint8_t *)((*a)->ip4.rawIpData()),4);
outp.armor(b->second->key,true);
sendto(v4s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in));
sendto(b->second->v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in));
s_outputRate.log(now,outp.size());
b->second->lastSend = now;
@ -619,7 +639,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
for(auto i=toAddrs.begin();i!=toAddrs.end();++i) {
if (sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))) > 0) {
if (sendto(i->first->isV4() ? i->second->v4s : i->second->v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))) > 0) {
s_outputRate.log(now,pkt.size());
s_forwardRate.log(now,pkt.size());
i->second->lastSend = now;
@ -639,16 +659,16 @@ static int bindSocket(struct sockaddr *const bindAddr)
}
int f = 16777216;
while (f > 131072) {
while (f > 65536) {
if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&f,sizeof(f)) == 0)
break;
f -= 131072;
f -= 65536;
}
f = 16777216;
while (f > 131072) {
while (f > 65536) {
if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&f,sizeof(f)) == 0)
break;
f -= 131072;
f -= 65536;
}
if (bindAddr->sa_family == AF_INET6) {
@ -913,7 +933,7 @@ int main(int argc,char **argv)
if ((pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH)&&(pl <= ZT_PROTO_MAX_PACKET_LENGTH)) {
try {
pkt->setSize((unsigned int)pl);
handlePacket(s4,s6,reinterpret_cast<const InetAddress *>(&in6),*pkt);
handlePacket(s6,reinterpret_cast<const InetAddress *>(&in6),*pkt);
} catch (std::exception &exc) {
char ipstr[128];
printf("WARNING: unexpected exception handling packet from %s: %s" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in6)->toString(ipstr),exc.what());
@ -943,7 +963,7 @@ int main(int argc,char **argv)
if ((pl >= ZT_PROTO_MIN_FRAGMENT_LENGTH)&&(pl <= ZT_PROTO_MAX_PACKET_LENGTH)) {
try {
pkt->setSize((unsigned int)pl);
handlePacket(s4,s6,reinterpret_cast<const InetAddress *>(&in4),*pkt);
handlePacket(s4,reinterpret_cast<const InetAddress *>(&in4),*pkt);
} catch (std::exception &exc) {
char ipstr[128];
printf("WARNING: unexpected exception handling packet from %s: %s" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in4)->toString(ipstr),exc.what());
@ -1021,7 +1041,7 @@ int main(int argc,char **argv)
"{\"address\":\"" << (*p)->id.address().toString(tmp) << "\""
",\"latency\":-1"
",\"paths\":[";
if ((*p)->ip4) {
if ((*p)->v4s >= 0) {
o <<
"{\"active\":true"
",\"address\":\"" << (*p)->ip4.toIpString(tmp) << "\\/" << (*p)->ip4.port() << "\""
@ -1031,8 +1051,8 @@ int main(int argc,char **argv)
",\"preferred\":true"
",\"trustedPathId\":0}";
}
if ((*p)->ip6) {
if ((*p)->ip4)
if ((*p)->v6s >= 0) {
if ((*p)->v4s >= 0)
o << ',';
o <<
"{\"active\":true"
@ -1074,9 +1094,9 @@ int main(int argc,char **argv)
{
std::lock_guard<std::mutex> l(s_peers_l);
for(auto p=s_peers.begin();p!=s_peers.end();++p) {
if ((*p)->ip4)
if ((*p)->v4s >= 0)
ips[(*p)->ip4].insert((*p)->id.address());
if ((*p)->ip6)
if ((*p)->v6s >= 0)
ips[(*p)->ip6].insert((*p)->id.address());
}
}
@ -1257,13 +1277,13 @@ int main(int argc,char **argv)
{
char ip4[128],ip6[128],ver[128];
for(auto p=sp.begin();p!=sp.end();++p) {
if ((*p)->ip4) {
if ((*p)->v4s >= 0) {
(*p)->ip4.toString(ip4);
} else {
ip4[0] = '-';
ip4[1] = 0;
}
if ((*p)->ip6) {
if ((*p)->v6s >= 0) {
(*p)->ip6.toString(ip6);
} else {
ip6[0] = '-';