From c9c17eaddd6380724a3b0360e8a2e5ae8f84a69c Mon Sep 17 00:00:00 2001
From: Adam Ierymenko <adam.ierymenko@gmail.com>
Date: Sun, 11 Nov 2018 22:35:15 -0800
Subject: [PATCH] Retire RethinkDB, simple receive path multithreading.

---
 {controller => attic}/RethinkDB.cpp      |  0
 {controller => attic}/RethinkDB.hpp      |  0
 controller/EmbeddedNetworkController.hpp |  3 +
 service/OneService.cpp                   | 76 ++++++++++++++++++++++++
 4 files changed, 79 insertions(+)
 rename {controller => attic}/RethinkDB.cpp (100%)
 rename {controller => attic}/RethinkDB.hpp (100%)

diff --git a/controller/RethinkDB.cpp b/attic/RethinkDB.cpp
similarity index 100%
rename from controller/RethinkDB.cpp
rename to attic/RethinkDB.cpp
diff --git a/controller/RethinkDB.hpp b/attic/RethinkDB.hpp
similarity index 100%
rename from controller/RethinkDB.hpp
rename to attic/RethinkDB.hpp
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index df6d4a7b2..c3f121c54 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -146,10 +146,13 @@ private:
 	Identity _signingId;
 	std::string _signingIdAddressString;
 	NetworkController::Sender *_sender;
+
 	std::unique_ptr<DB> _db;
 	BlockingQueue< _RQEntry * > _queue;
+
 	std::vector<std::thread> _threads;
 	std::mutex _threads_l;
+
 	std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
 	std::mutex _memberStatus_l;
 };
diff --git a/service/OneService.cpp b/service/OneService.cpp
index b14192346..86bae730c 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -34,6 +34,9 @@
 #include <vector>
 #include <algorithm>
 #include <list>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
 
 #include "../version.h"
 #include "../include/ZeroTierOne.h"
@@ -434,6 +437,8 @@ struct TcpConnection
 	Mutex writeq_m;
 };
 
+#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 4
+
 class OneServiceImpl : public OneService
 {
 public:
@@ -459,6 +464,18 @@ public:
 	unsigned int _tertiaryPort;
 	volatile unsigned int _udpPortPickerCounter;
 
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+	struct {
+		uint8_t data[2048];
+		std::thread thr;
+		int64_t sock;
+		struct sockaddr_storage from;
+		int size;
+		std::condition_variable cond;
+		std::mutex lock;
+	} _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
+#endif
+
 	// Local configuration and memo-ized information from it
 	json _localConfig;
 	Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
@@ -587,6 +604,39 @@ public:
 		_ports[0] = 0;
 		_ports[1] = 0;
 		_ports[2] = 0;
+
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+			_incomingPacketWorker[tn].thr = std::thread([this,tn]() {
+				std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
+				for(;;) {
+					_incomingPacketWorker[tn].cond.wait(l);
+					if (_incomingPacketWorker[tn].size < 0) {
+						break;
+					} else if (_incomingPacketWorker[tn].size > 0) {
+						const ZT_ResultCode rc = _node->processWirePacket(
+							(void *)0,
+							OSUtils::now(),
+							_incomingPacketWorker[tn].sock,
+							&(_incomingPacketWorker[tn].from),
+							_incomingPacketWorker[tn].data,
+							(unsigned int)_incomingPacketWorker[tn].size,
+							&_nextBackgroundTaskDeadline);
+						if (ZT_ResultCode_isFatal(rc)) {
+							char tmp[256];
+							OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+							Mutex::Lock _l(_termReason_m);
+							_termReason = ONE_UNRECOVERABLE_ERROR;
+							_fatalErrorMessage = tmp;
+							this->terminate();
+							break;
+						}
+					}
+				}
+			});
+		}
+#endif
+
 #if ZT_VAULT_SUPPORT
 		curl_global_init(CURL_GLOBAL_DEFAULT);
 #endif
@@ -594,6 +644,17 @@ public:
 
 	virtual ~OneServiceImpl()
 	{
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+			_incomingPacketWorker[tn].lock.lock();
+			_incomingPacketWorker[tn].size = -1;
+			_incomingPacketWorker[tn].lock.unlock();
+			_incomingPacketWorker[tn].cond.notify_all();
+		}
+		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+			_incomingPacketWorker[tn].thr.join();
+		}
+#endif
 		_binder.closeAll(_phy);
 		_phy.close(_localControlSocket4);
 		_phy.close(_localControlSocket6);
@@ -1840,6 +1901,20 @@ public:
 	{
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
 			_lastDirectReceiveFromGlobal = OSUtils::now();
+#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+		unsigned long cksum = 0;
+		for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
+			cksum += ((uint8_t *)from)[i];
+		}
+		const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
+		_incomingPacketWorker[tn].lock.lock();
+		memcpy(_incomingPacketWorker[tn].data,data,len);
+		_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
+		memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
+		_incomingPacketWorker[tn].size = (int)len;
+		_incomingPacketWorker[tn].lock.unlock();
+		_incomingPacketWorker[tn].cond.notify_all();
+#else
 		const ZT_ResultCode rc = _node->processWirePacket(
 			(void *)0,
 			OSUtils::now(),
@@ -1856,6 +1931,7 @@ public:
 			_fatalErrorMessage = tmp;
 			this->terminate();
 		}
+#endif
 	}
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)