From 02c3727ccdf27bf9ce77877f382d300a47531810 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 18 Apr 2016 10:21:38 -0700 Subject: [PATCH] . --- cluster-geo/README.md | 16 -- cluster-geo/cluster-geo.exe | 13 -- cluster-geo/cluster-geo/cluster-geo.js | 116 ---------- cluster-geo/cluster-geo/config.js.sample | 7 - cluster-geo/cluster-geo/package.json | 16 -- service/ClusterGeoIpService.cpp | 269 ++++++++++++----------- service/ClusterGeoIpService.hpp | 96 +++++--- 7 files changed, 211 insertions(+), 322 deletions(-) delete mode 100644 cluster-geo/README.md delete mode 100755 cluster-geo/cluster-geo.exe delete mode 100644 cluster-geo/cluster-geo/cluster-geo.js delete mode 100644 cluster-geo/cluster-geo/config.js.sample delete mode 100644 cluster-geo/cluster-geo/package.json diff --git a/cluster-geo/README.md b/cluster-geo/README.md deleted file mode 100644 index 23a097ad1..000000000 --- a/cluster-geo/README.md +++ /dev/null @@ -1,16 +0,0 @@ -Cluster GeoIP Service -====== - -In cluster mode (build with ZT\_ENABLE\_CLUSTER and install a cluster definition file), ZeroTier One can use geographic IP lookup to steer clients toward members of a cluster that are physically closer and are therefore very likely to offer lower latency and better performance. Ordinary non-clustered ZeroTier endpoints will have no use for this code. - -If a cluster-mode instance detects a file in the ZeroTier home folder called *cluster-geo.exe*, it attempts to execute it. If this program runs, it receives IP addresses on STDIN and produces lines of CSV on STDOUT with the following format: - - IP,result code,latitude,longitude,x,y,z - -IPv6 IPs must be sent *without* compression / zero-removal. - -The first field is the IP echoed back. The second field is 0 if the result is pending and may be ready in the future or 1 if the result is ready now. If the second field is 0 the remaining fields should be 0. Otherwise the remaining fields contain the IP's latitude, longitude, and X/Y/Z coordinates. - -ZeroTier's cluster route optimization code only uses the X/Y/Z values. These are computed by this cluster-geo code as the spherical coordinates of the IP address using the Earth's center as the point of origin and using an approximation of the Earth as a sphere. This doesn't yield *exact* coordinates, but it's good enough for our purposes since the goal is to route clients to the geographically closest endpoint. - -To install, copy *cluster-geo.exe* and the *cluster-geo/* subfolder into the ZeroTier home. Then go into *cluster-geo/* and run *npm install* to install the project's dependencies. A recent (4.x or newer) version of NodeJS is recommended. You will also need a [MaxMind GeoIP2 Precision Services](https://www.maxmind.com/) license key. The *MaxMind GeoIP2 City* tier is required since this supplies actual coordinates. It's a commercial service but is very inexpensive and offers very good accuracy for both IPv4 and IPv6 addresses. The *cluster-geo.js* program caches results in a LevelDB database for up to 120 days to reduce GeoIP API queries. diff --git a/cluster-geo/cluster-geo.exe b/cluster-geo/cluster-geo.exe deleted file mode 100755 index 56b76e0d4..000000000 --- a/cluster-geo/cluster-geo.exe +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -export PATH=/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin - -cd `dirname $0` -if [ ! -d cluster-geo -o ! -f cluster-geo/cluster-geo.js ]; then - echo 'Cannot find ./cluster-geo containing NodeJS script files.' - exit 1 -fi - -cd cluster-geo - -exec node --harmony cluster-geo.js diff --git a/cluster-geo/cluster-geo/cluster-geo.js b/cluster-geo/cluster-geo/cluster-geo.js deleted file mode 100644 index 77871fe33..000000000 --- a/cluster-geo/cluster-geo/cluster-geo.js +++ /dev/null @@ -1,116 +0,0 @@ -"use strict"; - -// -// GeoIP lookup service -// - -// GeoIP cache TTL in ms -var CACHE_TTL = (60 * 60 * 24 * 120 * 1000); // 120 days - -// Globally increase event emitter maximum listeners -//var EventEmitter = require('events'); -//EventEmitter.prototype._maxListeners = 1000; -//process.setMaxListeners(1000); - -// Load config -var config = require(__dirname + '/config.js'); - -if (!config.maxmind) { - console.error('FATAL: only MaxMind GeoIP2 is currently supported and is not configured in config.js'); - process.exit(1); -} - -var geo = require('geoip2ws')(config.maxmind); -var cache = require('levelup')(__dirname + '/cache.leveldb'); - -function lookup(ip,callback) -{ - if (!ip) - return callback(null,null); - - var ipKey = ip; - if ((ipKey.indexOf(':') === 4)&&(ipKey.length > 19)) - ipKey = ipKey.substr(0,19); // we key in the cache using only the first 64 bits of IPv6 addresses - - cache.get(ipKey,function(err,cachedEntryJson) { - - if ((!err)&&(cachedEntryJson)) { - try { - let cachedEntry = JSON.parse(cachedEntryJson.toString()); - if (cachedEntry) { - let ts = cachedEntry.ts; - let r = cachedEntry.r; - if ((ts)&&((Date.now() - ts) < CACHE_TTL)) { - //console.error(ip+': cached!'); - return callback(null,(r) ? r : null); - } - } - } catch (e) {} - } - - cache.put(ipKey,JSON.stringify({ - ts: Date.now() - (CACHE_TTL - 30000), // set ts to expire in 30 seconds while the query is in progress - r: null - }),function(err) { - geo(ip,function(err,result) { - if (err) { - return callback(err,null); - } - - if (!result) - result = null; - - cache.put(ipKey,JSON.stringify({ - ts: Date.now(), - r: result - }),function(err) { - //if (err) - // console.error('Error saving to cache: '+err); - return callback(null,result); - }); - }); - }); - - }); -}; - -var linebuf = ''; -process.stdin.on('readable',function() { - var chunk; - while (null !== (chunk = process.stdin.read())) { - for(var i=0;i 0) { - let ip = linebuf; - lookup(ip,function(err,result) { - if ((err)||(!result)||(!result.location)) { - return process.stdout.write(ip+',0,0,0,0,0,0\n'); - } else { - let lat = parseFloat(result.location.latitude); - let lon = parseFloat(result.location.longitude); - - // Convert to X,Y,Z coordinates from Earth's origin, Earth-as-sphere approximation. - let latRadians = lat * 0.01745329251994; // PI / 180 - let lonRadians = lon * 0.01745329251994; // PI / 180 - let cosLat = Math.cos(latRadians); - let x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers - let y = Math.round(6371.0 * Math.sin(latRadians)); - let z = Math.round(6371.0 * cosLat * Math.sin(lonRadians)); - - return process.stdout.write(ip+',1,'+lat+','+lon+','+x+','+y+','+z+'\n'); - } - }); - } - linebuf = ''; - } else { - linebuf += String.fromCharCode(c); - } - } - } -}); - -process.stdin.on('end',function() { - cache.close(); - process.exit(0); -}); diff --git a/cluster-geo/cluster-geo/config.js.sample b/cluster-geo/cluster-geo/config.js.sample deleted file mode 100644 index ec1ebfea4..000000000 --- a/cluster-geo/cluster-geo/config.js.sample +++ /dev/null @@ -1,7 +0,0 @@ -// MaxMind GeoIP2 config -module.exports.maxmind = { - userId: 1234, - licenseKey: 'asdf', - service: 'city', - requestTimeout: 1000 -}; diff --git a/cluster-geo/cluster-geo/package.json b/cluster-geo/cluster-geo/package.json deleted file mode 100644 index f7207a0db..000000000 --- a/cluster-geo/cluster-geo/package.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "name": "cluster-geo", - "version": "1.0.0", - "description": "Cluster GEO-IP Query Service", - "main": "cluster-geo.js", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "author": "ZeroTier, Inc.", - "license": "GPL-3.0", - "dependencies": { - "geoip2ws": "^1.7.1", - "leveldown": "^1.4.4", - "levelup": "^1.3.0" - } -} diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp index e9a71ba18..c1483cac2 100644 --- a/service/ClusterGeoIpService.cpp +++ b/service/ClusterGeoIpService.cpp @@ -18,168 +18,181 @@ #ifdef ZT_ENABLE_CLUSTER -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include -#include +#include #include "ClusterGeoIpService.hpp" + #include "../node/Utils.hpp" +#include "../node/InetAddress.hpp" #include "../osdep/OSUtils.hpp" -// 120 days -#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL 10368000000ULL +#define ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY 10000 namespace ZeroTier { -ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) : - _pathToExe(pathToExe), - _sOutputFd(-1), - _sInputFd(-1), - _sPid(0), - _run(true) +ClusterGeoIpService::ClusterGeoIpService() : + _pathToCsv(), + _ipStartColumn(-1), + _ipEndColumn(-1), + _latitudeColumn(-1), + _longitudeColumn(-1), + _lastFileCheckTime(0), + _csvModificationTime(0), + _csvFileSize(0) { - _thread = Thread::start(this); } ClusterGeoIpService::~ClusterGeoIpService() { - _run = false; - long p = _sPid; - if (p > 0) { - ::kill(p,SIGTERM); - Thread::sleep(500); - ::kill(p,SIGKILL); - } - Thread::join(_thread); } bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) { - InetAddress ipNoPort(ip); - ipNoPort.setPort(0); // we index cache by IP only - const uint64_t now = OSUtils::now(); + Mutex::Lock _l(_lock); - bool r = false; - { - Mutex::Lock _l(_cache_m); - std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort)); - if (c != _cache.end()) { - x = c->second.x; - y = c->second.y; - z = c->second.z; - if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL) + if ((_pathToCsv.length() > 0)&&((OSUtils::now() - _lastFileCheckTime) > ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY)) { + _lastFileCheckTime = OSUtils::now(); + if ((_csvFileSize != OSUtils::getFileSize(_pathToCsv.c_str()))||(_csvModificationTime != OSUtils::getLastModified(_pathToCsv.c_str()))) + _load(_pathToCsv.c_str(),_ipStartColumn,_ipEndColumn,_latitudeColumn,_longitudeColumn); + } + + /* We search by looking up the upper bound of the sorted vXdb vectors + * and then iterating down for a matching IP range. We stop when we hit + * the beginning or an entry whose start and end are before the IP we + * are searching. */ + + if ((ip.ss_family == AF_INET)&&(_v4db.size() > 0)) { + _V4E key; + key.start = Utils::ntoh((uint32_t)(reinterpret_cast(&ip)->sin_addr.s_addr)); + std::vector<_V4E>::const_iterator i(std::upper_bound(_v4db.begin(),_v4db.end(),key)); + while (i != _v4db.begin()) { + --i; + if ((key->start >= i->start)&&(key->start <= i->end)) { + x = i->x; + y = i->y; + z = i->z; return true; - else r = true; // return true but refresh as well + } else if ((key->start > i->start)&&(key->start > i->end)) + break; + } + } else if ((ip.ss_family == AF_INET6)&&(_v6db.size() > 0)) { + _V6E key; + memcpy(key.start,reinterpret_cast(&ip)->sin6_addr.s6_addr,16); + std::vector<_V6E>::const_iterator i(std::upper_bound(_v6db.begin(),_v6db.end(),key)); + while (i != _v6db.begin()) { + --i; + const int s_vs_s = memcmp(key->start,i->start,16); + const int s_vs_e = memcmp(key->start,i->end,16); + if ((s_vs_s >= 0)&&(s_vs_e <= 0)) { + x = i->x; + y = i->y; + z = i->z; + return true; + } else if ((s_vs_s > 0)&&(s_vs_e > 0)) + break; } } - { - Mutex::Lock _l(_sOutputLock); - if (_sOutputFd >= 0) { - std::string ips(ipNoPort.toIpString()); - ips.push_back('\n'); - //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str()); - ::write(_sOutputFd,ips.data(),ips.length()); - } - } - - return r; + return false; } -void ClusterGeoIpService::threadMain() - throw() +static void _parseLine(const char *line,std::vector<_V4E> &v4db,std::vector<_V6E> &v6db,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn) { - char linebuf[65536]; - char buf[65536]; - long n,lineptr; + std::vector ls(Utils::split(line,",\t","\\","\"'")); + if ( ((ipStartColumn >= 0)&&(ipStartColumn < (int)ls.size()))&& + ((ipEndColumn >= 0)&&(ipEndColumn < (int)ls.size()))&& + ((latitudeColumn >= 0)&&(latitudeColumn < (int)ls.size()))&& + ((longitudeColumn >= 0)&&(longitudeColumn < (int)ls.size())) ) { + InetAddress ipStart(ls[ipStartColumn].c_str(),0); + InetAddress ipEnd(ls[ipEndColumn].c_str(),0); + const double lat = strtod(ls[latitudeColumn].c_str(),(char **)0); + const double lon = strtod(ls[longitudeColumn].c_str(),(char **)0); - while (_run) { - { - Mutex::Lock _l(_sOutputLock); + if ((ipStart.ss_family == ipEnd.ss_family)&&(ipStart)&&(ipEnd)&&(std::isfinite(lat))&&(std::isfinite(lon))) { + const double latRadians = lat * 0.01745329251994; // PI / 180 + const double lonRadians = lon * 0.01745329251994; // PI / 180 + const double cosLat = cos(latRadians); + const int x = (int)round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers + const int y = (int)round(6371.0 * sin(latRadians)); + const int z = (int)round(6371.0 * cosLat * Math.sin(lonRadians)); - _sOutputFd = -1; - _sInputFd = -1; - _sPid = 0; - - int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output - int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input - ::pipe(stdinfds); - ::pipe(stdoutfds); - - long p = (long)::vfork(); - if (p < 0) { - Thread::sleep(500); - continue; - } else if (p == 0) { - ::close(stdinfds[1]); - ::close(stdoutfds[0]); - ::dup2(stdinfds[0],STDIN_FILENO); - ::dup2(stdoutfds[1],STDOUT_FILENO); - ::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0); - ::exit(1); - } else { - ::close(stdinfds[0]); - ::close(stdoutfds[1]); - _sOutputFd = stdinfds[1]; - _sInputFd = stdoutfds[0]; - _sPid = p; + if (ipStart.ss_family == AF_INET) { + v4db.push_back(_V4E()); + v4db.back().start = Utils::ntoh((uint32_t)(reinterpret_cast(&ipStart)->sin_addr.s_addr)); + v4db.back().end = Utils::ntoh((uint32_t)(reinterpret_cast(&ipEnd)->sin_addr.s_addr)); + v4db.back().x = x; + v4db.back().y = y; + v4db.back().z = z; + } else if (ipStart.ss_family == AF_INET6) { + v6db.push_back(_V6E()); + memcpy(v6db.back().start,reinterpret_cast(&ipStart)->sin6_addr.s6_addr,16); + memcpy(v6db.back().end,reinterpret_cast(&ipEnd)->sin6_addr.s6_addr,16); + v6db.back().x = x; + v6db.back().y = y; + v6db.back().z = z; } } + } +} - lineptr = 0; - while (_run) { - n = ::read(_sInputFd,buf,sizeof(buf)); - if (n <= 0) { - if (errno == EINTR) - continue; - else break; - } - for(long i=0;i (long)sizeof(linebuf)) - lineptr = 0; - if ((buf[i] == '\n')||(buf[i] == '\r')) { +long ClusterGeoIpService::_load(const char *pathToCsv,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn) +{ + // assumes _lock is locked + + FILE *f = fopen(pathToCsv,"rb"); + if (!f) + return -1; + + std::vector<_V4E> v4db; + std::vector<_V6E> v6db; + + char buf[4096]; + char linebuf[1024]; + unsigned int lineptr = 0; + for(;;) { + int n = (int)fread(buf,1,sizeof(buf),f); + if (n <= 0) + break; + for(int i=0;i 0) { - //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf); - try { - std::vector result(Utils::split(linebuf,",","","")); - if ((result.size() >= 7)&&(result[1] == "1")) { - InetAddress rip(result[0],0); - if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) { - _CE ce; - ce.ts = OSUtils::now(); - ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); - ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); - ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); - //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z); - { - Mutex::Lock _l2(_cache_m); - _cache[rip] = ce; - } - } - } - } catch ( ... ) {} - } - lineptr = 0; - } else linebuf[lineptr++] = buf[i]; - } + _parseLine(linebuf,v4db,v6db,ipStartColumn,ipEndColumn,latitudeColumn,longitudeColumn); + } + lineptr = 0; + } else if (lineptr < (unsigned int)sizeof(linebuf)) + linebuf[lineptr++] = buf[i]; } + } + if (lineptr) { + linebuf[lineptr] = (char)0; + _parseLine(linebuf,v4db,v6db,ipStartColumn,ipEndColumn,latitudeColumn,longitudeColumn); + } - ::close(_sOutputFd); - ::close(_sInputFd); - ::kill(_sPid,SIGTERM); - Thread::sleep(250); - ::kill(_sPid,SIGKILL); - ::waitpid(_sPid,(int *)0,0); + fclose(f); + + if ((v4db.size() > 0)||(v6db.size() > 0)) { + std::sort(v4db.begin(),v4db.end()); + std::sort(v6db.begin(),v6db.end()); + + _pathToCsv = pathToCsv; + _ipStartColumn = ipStartColumn; + _ipEndColumn = ipEndColumn; + _latitudeColumn = latitudeColumn; + _longitudeColumn = longitudeColumn; + + _lastFileCheckTime = OSUtils::now(); + _csvModificationTime = OSUtils::getLastModified(pathToCsv); + _csvFileSize = OSUtils::getFileSize(pathToCsv); + + _v4db.swap(v4db); + _v6db.swap(v6db); + + return (long)(_v4db.size() + _v6db.size()); + } else { + return 0; } } diff --git a/service/ClusterGeoIpService.hpp b/service/ClusterGeoIpService.hpp index 11c144a20..f4fd97591 100644 --- a/service/ClusterGeoIpService.hpp +++ b/service/ClusterGeoIpService.hpp @@ -21,37 +21,61 @@ #ifdef ZT_ENABLE_CLUSTER +#include +#include +#include +#include + #include -#include #include +#include #include "../node/Constants.hpp" -#include "../node/InetAddress.hpp" #include "../node/Mutex.hpp" -#include "../osdep/Thread.hpp" namespace ZeroTier { /** - * Runs the Cluster GeoIP service in the background and resolves geoIP queries + * Loads a DBIP CSV into memory for fast lookup, reloading as needed + * + * This was designed around the CSV from https://db-ip.com but can be used + * with any similar GeoIP CSV database that is presented in the form of an + * IP range and lat/long coordinates. + * + * It loads the whole database into memory, which can be kind of large. If + * the CSV file changes, the changes are loaded automatically. */ class ClusterGeoIpService { public: - /** - * @param pathToExe Path to cluster geo-resolution service executable - */ - ClusterGeoIpService(const char *pathToExe); - + ClusterGeoIpService(); ~ClusterGeoIpService(); + /** + * Load or reload CSV file + * + * CSV column indexes start at zero. CSVs can be quoted with single or + * double quotes. Whitespace before or after commas is ignored. Backslash + * may be used for escaping whitespace as well. + * + * @param pathToCsv Path to (uncompressed) CSV file + * @param ipStartColumn Column with IP range start + * @param ipEndColumn Column with IP range end (inclusive) + * @param latitudeColumn Column with latitude + * @param longitudeColumn Column with longitude + * @return Number of valid records loaded or -1 on error (invalid file, not found, etc.) + */ + inline long load(const char *pathToCsv,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn) + { + Mutex::Lock _l(_lock); + return _load(pathToCsv,ipStartColumn,ipEndColumn,latitudeColumn,longitudeColumn); + } + /** * Attempt to locate an IP * - * This returns true if x, y, and z are set. Otherwise it returns false - * and a geo-locate job is ordered in the background. This usually takes - * 500-1500ms to complete, after which time results will be available. - * If false is returned the supplied coordinate variables are unchanged. + * This returns true if x, y, and z are set. If the return value is false + * the values of x, y, and z are undefined. * * @param ip IPv4 or IPv6 address * @param x Reference to variable to receive X @@ -61,21 +85,41 @@ public: */ bool locate(const InetAddress &ip,int &x,int &y,int &z); - void threadMain() - throw(); - private: - const std::string _pathToExe; - int _sOutputFd; - int _sInputFd; - volatile long _sPid; - volatile bool _run; - Thread _thread; - Mutex _sOutputLock; + long _load(const char *pathToCsv,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn); - struct _CE { uint64_t ts; int x,y,z; }; - std::map< InetAddress,_CE > _cache; - Mutex _cache_m; + std::string _pathToCsv; + int _ipStartColumn; + int _ipEndColumn; + int _latitudeColumn; + int _longitudeColumn; + + uint64_t _lastFileCheckTime; + uint64_t _csvModificationTime; + int64_t _csvFileSize; + + struct _V4E + { + uint32_t start; + uint32_t end; + int x,y,z; + + inline bool operator<(const _V4E &e) const { return (start < e.start); } + }; + + struct _V6E + { + uint8_t start[16]; + uint8_t end[16]; + int x,y,z; + + inline bool operator<(const _V6E &e) const { return (memcmp(start,e.start,16) < 0); } + }; + + std::vector<_V4E> _v4db; + std::vector<_V6E> _v6db; + + Mutex _lock; }; } // namespace ZeroTier