Cleanup, add support for distributing a new planet file

This commit is contained in:
Adam Ierymenko 2019-09-03 14:44:13 -07:00
parent 29be175743
commit b1c22949c5
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 116 additions and 74 deletions

View file

@ -458,6 +458,9 @@ public:
* <[8] in-re packet ID>
* <[1] error code>
* <[...] error-dependent payload>
*
* If this is not in response to a single packet then verb can be
* NOP and packet ID can be zero.
*/
VERB_ERROR = 0x02,
@ -962,7 +965,11 @@ public:
ERROR_NETWORK_ACCESS_DENIED_ = 0x07, /* extra _ at end to avoid Windows name conflict */
/* Multicasts to this group are not wanted */
ERROR_UNWANTED_MULTICAST = 0x08
ERROR_UNWANTED_MULTICAST = 0x08,
/* Cannot deliver a forwarded ZeroTier packet (e.g. hops exceeded, no routes) */
/* Payload: <packet ID>, <destination>, <... additional packet ID / destinations> */
ERROR_CANNOT_DELIVER = 0x09
};
template<unsigned int C2>

View file

@ -24,6 +24,7 @@
* "port": UDP port (int)
* "httpPort": Local HTTP port for basic stats (int)
* "relayMaxHops": Max hops (up to 7)
* "planetFile": Location of planet file for pre-2.x peers (string)
* "statsRoot": If present, path to periodically save stats files (string)
* "siblings": [
* {
@ -143,7 +144,7 @@ struct RendezvousKey
*/
struct RootPeer
{
ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vMajor(-1),vMinor(-1),vRev(-1),sibling(false) {}
ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1),sibling(false) {}
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
Identity id; // Identity
@ -154,6 +155,7 @@ struct RootPeer
int64_t lastSync; // Time of last data synchronization with LF or other root state backend (currently unused)
int64_t lastEcho; // Time of last received ECHO
int64_t lastHello; // Time of last received HELLO
int vProto; // Protocol version
int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown
bool sibling; // If true, this is a sibling root that will get forwards we don't know where to send
std::mutex lock;
@ -161,13 +163,14 @@ struct RootPeer
AtomicCounter __refCount;
};
static int64_t startTime;
static std::vector<int> ports;
static int relayMaxHops = 0;
static Identity self;
static std::atomic_bool run;
static json config;
static std::string statsRoot;
static int64_t s_startTime; // Time service was started
static std::vector<int> s_ports; // Ports to bind for UDP traffic
static int s_relayMaxHops = 0; // Max relay hops
static Identity s_self; // My identity (including secret)
static std::atomic_bool s_run; // Remains true until shutdown is ordered
static json s_config; // JSON config file contents
static std::string s_statsRoot; // Root to write stats, peers, etc.
static std::string s_planet; // Planet file contents to distribute with OK(HELLO) if any
static Meter inputRate;
static Meter outputRate;
@ -182,6 +185,7 @@ static std::unordered_map< Address,std::set< SharedPtr<RootPeer> >,AddressHasher
static std::unordered_map< InetAddress,std::set< SharedPtr<RootPeer> >,InetAddressHasher > peersByPhysAddr;
static std::unordered_map< RendezvousKey,int64_t,RendezvousKey::Hasher > lastRendezvous;
static std::mutex s_planet_l;
static std::mutex siblings_l;
static std::mutex multicastSubscriptions_l;
static std::mutex peersByIdentity_l;
@ -202,7 +206,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
inputRate.log(now,pkt.size());
if ((!fragment)&&(!pkt.fragmented())&&(dest == self.address())) {
if ((!fragment)&&(!pkt.fragmented())&&(dest == s_self.address())) {
SharedPtr<RootPeer> peer;
// If this is an un-encrypted HELLO, either learn a new peer or verify
@ -227,7 +231,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
} else {
peer.set(new RootPeer);
if (self.agree(id,peer->key)) {
if (s_self.agree(id,peer->key)) {
if (pkt.dearmor(peer->key)) {
if (!pkt.uncompress()) {
printf("%s HELLO rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr));
@ -297,13 +301,14 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
try {
if ((now - peer->lastHello) > 1000) {
peer->lastHello = now;
peer->vProto = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION];
peer->vMajor = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_MAJOR_VERSION];
peer->vMinor = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_MINOR_VERSION];
peer->vRev = (int)pkt.template at<uint16_t>(ZT_PROTO_VERB_HELLO_IDX_REVISION);
const uint64_t origId = pkt.packetId();
const uint64_t ts = pkt.template at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP);
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.reset(source,s_self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_HELLO);
pkt.append(origId);
pkt.append(ts);
@ -312,6 +317,13 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
pkt.append((uint8_t)0);
pkt.append((uint16_t)0);
ip->serialize(pkt);
if (peer->vProto < 11) { // send planet file for pre-2.x peers
std::lock_guard<std::mutex> pl(s_planet_l);
if (s_planet.length() > 0) {
pkt.append((uint16_t)s_planet.size());
pkt.append((const uint8_t *)s_planet.data(),s_planet.size());
}
}
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
@ -328,7 +340,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if ((now - peer->lastEcho) > 1000) {
peer->lastEcho = now;
Packet outp(source,self.address(),Packet::VERB_OK);
Packet outp(source,s_self.address(),Packet::VERB_OK);
outp.append((uint8_t)Packet::VERB_ECHO);
outp.append(pkt.packetId());
outp.append(((const uint8_t *)pkt.data()) + ZT_PACKET_IDX_PAYLOAD,pkt.size() - ZT_PACKET_IDX_PAYLOAD);
@ -359,7 +371,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if (!results.empty()) {
const uint64_t origId = pkt.packetId();
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.reset(source,s_self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_WHOIS);
pkt.append(origId);
for(auto p=results.begin();p!=results.end();++p)
@ -398,7 +410,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
gatherLimit = 255;
const uint64_t origId = pkt.packetId();
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.reset(source,s_self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_MULTICAST_GATHER);
pkt.append(origId);
pkt.append(nwid);
@ -451,26 +463,29 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
// If we made it here, we are forwarding this packet to someone else and also possibly
// sending a RENDEZVOUS message.
int hops = 0;
bool introduce = false;
if (fragment) {
if ((int)reinterpret_cast<Packet::Fragment *>(&pkt)->incrementHops() > relayMaxHops) {
if ((hops = (int)reinterpret_cast<Packet::Fragment *>(&pkt)->incrementHops()) > s_relayMaxHops) {
//printf("%s refused to forward to %s: max hop count exceeded" ZT_EOL_S,ip->toString(ipstr),dest.toString(astr));
discardedForwardRate.log(now,pkt.size());
return;
}
} else {
if ((int)pkt.incrementHops() > relayMaxHops) {
if ((hops = (int)pkt.incrementHops()) > s_relayMaxHops) {
//printf("%s refused to forward to %s: max hop count exceeded" ZT_EOL_S,ip->toString(ipstr),dest.toString(astr));
discardedForwardRate.log(now,pkt.size());
return;
}
RendezvousKey rk(source,dest);
std::lock_guard<std::mutex> l(lastRendezvous_l);
int64_t &lr = lastRendezvous[rk];
if ((now - lr) >= 45000) {
lr = now;
introduce = true;
if (hops == 1) {
RendezvousKey rk(source,dest);
std::lock_guard<std::mutex> l(lastRendezvous_l);
int64_t &lr = lastRendezvous[rk];
if ((now - lr) >= 45000) {
lr = now;
introduce = true;
}
}
}
@ -515,7 +530,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
//printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip6.toString(ipstr2),dest.toString(astr2));
// Introduce source to destination (V6)
Packet outp(source,self.address(),Packet::VERB_RENDEZVOUS);
Packet outp(source,s_self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
dest.appendTo(outp);
outp.append((uint16_t)b->second->ip6.port());
@ -528,7 +543,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
(*a)->lastSend = now;
// Introduce destination to source (V6)
outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS);
outp.reset(dest,s_self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
source.appendTo(outp);
outp.append((uint16_t)ip->port());
@ -544,7 +559,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
//printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip4.toString(ipstr2),dest.toString(astr2));
// Introduce source to destination (V4)
Packet outp(source,self.address(),Packet::VERB_RENDEZVOUS);
Packet outp(source,s_self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
dest.appendTo(outp);
outp.append((uint16_t)b->second->ip4.port());
@ -557,7 +572,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
(*a)->lastSend = now;
// Introduce destination to source (V4)
outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS);
outp.reset(dest,s_self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
source.appendTo(outp);
outp.append((uint16_t)ip->port());
@ -648,7 +663,7 @@ static int bindSocket(struct sockaddr *const bindAddr)
return s;
}
static void shutdownSigHandler(int sig) { run = false; }
static void shutdownSigHandler(int sig) { s_run = false; }
int main(int argc,char **argv)
{
@ -660,7 +675,7 @@ int main(int argc,char **argv)
signal(SIGUSR2,SIG_IGN);
signal(SIGCHLD,SIG_IGN);
startTime = OSUtils::now();
s_startTime = OSUtils::now();
if (argc < 3) {
printf("Usage: zerotier-root <identity.secret> <config path>" ZT_EOL_S);
@ -673,11 +688,11 @@ int main(int argc,char **argv)
printf("FATAL: cannot read identity.secret at %s" ZT_EOL_S,argv[1]);
return 1;
}
if (!self.fromString(myIdStr.c_str())) {
if (!s_self.fromString(myIdStr.c_str())) {
printf("FATAL: cannot read identity.secret at %s (invalid identity)" ZT_EOL_S,argv[1]);
return 1;
}
if (!self.hasPrivate()) {
if (!s_self.hasPrivate()) {
printf("FATAL: cannot read identity.secret at %s (missing secret key)" ZT_EOL_S,argv[1]);
return 1;
}
@ -689,7 +704,7 @@ int main(int argc,char **argv)
return 1;
}
try {
config = json::parse(configStr);
s_config = json::parse(configStr);
} catch (std::exception &exc) {
printf("FATAL: config file at %s invalid: %s" ZT_EOL_S,argv[2],exc.what());
return 1;
@ -697,14 +712,14 @@ int main(int argc,char **argv)
printf("FATAL: config file at %s invalid: unknown exception" ZT_EOL_S,argv[2]);
return 1;
}
if (!config.is_object()) {
if (!s_config.is_object()) {
printf("FATAL: config file at %s invalid: does not contain a JSON object" ZT_EOL_S,argv[2]);
return 1;
}
}
try {
auto jport = config["port"];
auto jport = s_config["port"];
if (jport.is_array()) {
for(long i=0;i<(long)jport.size();++i) {
int port = jport[i];
@ -712,7 +727,7 @@ int main(int argc,char **argv)
printf("FATAL: invalid port in config file %d" ZT_EOL_S,port);
return 1;
}
ports.push_back(port);
s_ports.push_back(port);
}
} else {
int port = jport;
@ -720,16 +735,16 @@ int main(int argc,char **argv)
printf("FATAL: invalid port in config file %d" ZT_EOL_S,port);
return 1;
}
ports.push_back(port);
s_ports.push_back(port);
}
} catch ( ... ) {}
if (ports.empty())
ports.push_back(ZT_DEFAULT_PORT);
std::sort(ports.begin(),ports.end());
if (s_ports.empty())
s_ports.push_back(ZT_DEFAULT_PORT);
std::sort(s_ports.begin(),s_ports.end());
int httpPort = ZT_DEFAULT_PORT;
try {
httpPort = config["httpPort"];
httpPort = s_config["httpPort"];
if ((httpPort <= 0)||(httpPort > 65535)) {
printf("FATAL: invalid HTTP port in config file %d" ZT_EOL_S,httpPort);
return 1;
@ -738,29 +753,36 @@ int main(int argc,char **argv)
httpPort = ZT_DEFAULT_PORT;
}
std::string planetFilePath;
try {
statsRoot = config["statsRoot"];
while ((statsRoot.length() > 0)&&(statsRoot[statsRoot.length()-1] == ZT_PATH_SEPARATOR))
statsRoot = statsRoot.substr(0,statsRoot.length()-1);
if (statsRoot.length() > 0)
OSUtils::mkdir(statsRoot);
planetFilePath = s_config["planetFile"];
} catch ( ... ) {
statsRoot = "";
}
relayMaxHops = ZT_RELAY_MAX_HOPS;
try {
relayMaxHops = config["relayMaxHops"];
if (relayMaxHops > ZT_PROTO_MAX_HOPS)
relayMaxHops = ZT_PROTO_MAX_HOPS;
else if (relayMaxHops < 0)
relayMaxHops = 0;
} catch ( ... ) {
relayMaxHops = ZT_RELAY_MAX_HOPS;
planetFilePath = "";
}
try {
auto sibs = config["siblings"];
s_statsRoot = s_config["statsRoot"];
while ((s_statsRoot.length() > 0)&&(s_statsRoot[s_statsRoot.length()-1] == ZT_PATH_SEPARATOR))
s_statsRoot = s_statsRoot.substr(0,s_statsRoot.length()-1);
if (s_statsRoot.length() > 0)
OSUtils::mkdir(s_statsRoot);
} catch ( ... ) {
s_statsRoot = "";
}
s_relayMaxHops = ZT_RELAY_MAX_HOPS;
try {
s_relayMaxHops = s_config["s_relayMaxHops"];
if (s_relayMaxHops > ZT_PROTO_MAX_HOPS)
s_relayMaxHops = ZT_PROTO_MAX_HOPS;
else if (s_relayMaxHops < 0)
s_relayMaxHops = 0;
} catch ( ... ) {
s_relayMaxHops = ZT_RELAY_MAX_HOPS;
}
try {
auto sibs = s_config["siblings"];
if (sibs.is_array()) {
for(long i=0;i<(long)sibs.size();++i) {
auto sib = sibs[i];
@ -780,7 +802,7 @@ int main(int argc,char **argv)
ip.setPort((unsigned int)sib["port"]);
SharedPtr<RootPeer> rp(new RootPeer);
rp->id = id;
if (!self.agree(id,rp->key)) {
if (!s_self.agree(id,rp->key)) {
printf("FATAL: invalid JSON while parsing siblings section in config file: invalid identity in sibling entry (unable to execute key agreement)" ZT_EOL_S);
return 1;
}
@ -814,13 +836,13 @@ int main(int argc,char **argv)
unsigned int ncores = std::thread::hardware_concurrency();
if (ncores == 0) ncores = 1;
run = true;
s_run = true;
std::vector<std::thread> threads;
std::vector<int> sockets;
int v4Sock = -1,v6Sock = -1;
for(auto port=ports.begin();port!=ports.end();++port) {
for(auto port=s_ports.begin();port!=s_ports.end();++port) {
for(unsigned int tn=0;tn<ncores;++tn) {
struct sockaddr_in6 in6;
memset(&in6,0,sizeof(in6));
@ -963,7 +985,7 @@ int main(int argc,char **argv)
int64_t lastCleaned = 0;
int64_t lastWroteStats = 0;
int64_t lastPingedSiblings = 0;
while (run) {
while (s_run) {
//peersByIdentity_l.lock();
//peersByPhysAddr_l.lock();
//printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)peersByIdentity.size(),(unsigned long)peersByPhysAddr.size());
@ -980,13 +1002,13 @@ int main(int argc,char **argv)
for(auto s=siblings.begin();s!=siblings.end();++s) {
const InetAddress *ip = nullptr;
socklen_t sl = 0;
Packet outp((*s)->id.address(),self.address(),Packet::VERB_HELLO);
Packet outp((*s)->id.address(),s_self.address(),Packet::VERB_HELLO);
outp.append((uint8_t)ZT_PROTO_VERSION);
outp.append((uint8_t)ZEROTIER_ONE_VERSION_MAJOR);
outp.append((uint8_t)ZEROTIER_ONE_VERSION_MINOR);
outp.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
outp.append((uint8_t)0);
outp.append((uint8_t)0);
outp.append((uint16_t)0);
outp.append((uint64_t)now);
self.serialize(outp,false);
s_self.serialize(outp,false);
if ((*s)->ip4) {
(*s)->ip4.serialize(outp);
ip = &((*s)->ip4);
@ -1074,11 +1096,24 @@ int main(int argc,char **argv)
}
}
// Write stats if configured to do so
if (((now - lastWroteStats) > 15000)&&(statsRoot.length() > 0)) {
// Write stats if configured to do so, and periodically refresh planet file (if any)
if (((now - lastWroteStats) > 15000)&&(s_statsRoot.length() > 0)) {
lastWroteStats = now;
std::string peersFilePath(statsRoot);
try {
if (planetFilePath.length() > 0) {
std::string planetData;
if ((OSUtils::readFile(planetFilePath.c_str(),planetData))&&(planetData.length() > 0)) {
std::lock_guard<std::mutex> pl(s_planet_l);
s_planet = planetData;
}
}
} catch ( ... ) {
std::lock_guard<std::mutex> pl(s_planet_l);
s_planet.clear();
}
std::string peersFilePath(s_statsRoot);
peersFilePath.append("/.peers.tmp");
FILE *pf = fopen(peersFilePath.c_str(),"wb");
if (pf) {
@ -1110,17 +1145,17 @@ int main(int argc,char **argv)
}
fclose(pf);
std::string peersFilePath2(statsRoot);
std::string peersFilePath2(s_statsRoot);
peersFilePath2.append("/peers");
OSUtils::rm(peersFilePath2);
OSUtils::rename(peersFilePath.c_str(),peersFilePath2.c_str());
}
std::string statsFilePath(statsRoot);
std::string statsFilePath(s_statsRoot);
statsFilePath.append("/.stats.tmp");
FILE *sf = fopen(statsFilePath.c_str(),"wb");
if (sf) {
fprintf(sf,"Uptime (seconds) : %ld" ZT_EOL_S,(long)((now - startTime) / 1000));
fprintf(sf,"Uptime (seconds) : %ld" ZT_EOL_S,(long)((now - s_startTime) / 1000));
peersByIdentity_l.lock();
fprintf(sf,"Peers : %llu" ZT_EOL_S,(unsigned long long)peersByIdentity.size());
peersByVirtAddr_l.lock();
@ -1140,7 +1175,7 @@ int main(int argc,char **argv)
fprintf(sf,"Discarded Forward BPS : %.4f" ZT_EOL_S,discardedForwardRate.perSecond(now));
fclose(sf);
std::string statsFilePath2(statsRoot);
std::string statsFilePath2(s_statsRoot);
statsFilePath2.append("/stats");
OSUtils::rm(statsFilePath2);
OSUtils::rename(statsFilePath.c_str(),statsFilePath2.c_str());