From 74dc41c7c73669f5575851c830050747e332e38d Mon Sep 17 00:00:00 2001
From: Grant Limberg <glimberg@users.noreply.github.com>
Date: Thu, 4 May 2023 07:58:02 -0700
Subject: [PATCH] Peer metrics (#1995)

* Adding peer metrics

still need to be wired up for use

* per peer packet metrics

* Fix crash from bad instantiation of histogram

* separate alive & dead path counts

* Add peer metric update block

* add peer latency values in doPingAndKeepalive

* prevent deadlock

* peer latency histogram actually works now

* cleanup

* capture counts of packets to specific peers

---------

Co-authored-by: Joseph Henry <joseph.henry@zerotier.com>
---
 .../core/include/prometheus/histogram.h       |  18 +--
 node/Metrics.cpp                              |  16 +++
 node/Metrics.hpp                              |   7 ++
 node/Peer.cpp                                 | 107 +++++++++++-------
 node/Peer.hpp                                 |   7 ++
 5 files changed, 102 insertions(+), 53 deletions(-)

diff --git a/ext/prometheus-cpp-lite-1.0/core/include/prometheus/histogram.h b/ext/prometheus-cpp-lite-1.0/core/include/prometheus/histogram.h
index ac558fa21..0afe41758 100644
--- a/ext/prometheus-cpp-lite-1.0/core/include/prometheus/histogram.h
+++ b/ext/prometheus-cpp-lite-1.0/core/include/prometheus/histogram.h
@@ -28,15 +28,9 @@ namespace prometheus {
   /// a data race.
   template <typename Value_ = uint64_t>
   class Histogram : public Metric {
-
-      using BucketBoundaries = std::vector<Value_>;
-
-      const BucketBoundaries       bucket_boundaries_;
-      std::vector<Counter<Value_>> bucket_counts_;
-      Gauge<Value_>                sum_;
-
     public:
       using Value  = Value_;
+      using BucketBoundaries = std::vector<Value_>;
       using Family = CustomFamily<Histogram<Value>>;
 
       static const Metric::Type static_type = Metric::Type::Histogram;
@@ -69,7 +63,7 @@ namespace prometheus {
           bucket_boundaries_.begin(),
           std::find_if(
             std::begin(bucket_boundaries_), std::end(bucket_boundaries_),
-            [value](const double boundary) { return boundary >= value; })));
+            [value](const Value boundary) { return boundary >= value; })));
         sum_.Increment(value);
         bucket_counts_[bucket_index].Increment();
       }
@@ -110,7 +104,7 @@ namespace prometheus {
           bucket.cumulative_count = cumulative_count;
           bucket.upper_bound = (i == bucket_boundaries_.size()
                                     ? std::numeric_limits<double>::infinity()
-                                    : bucket_boundaries_[i]);
+                                    : static_cast<double>(bucket_boundaries_[i]));
           metric.histogram.bucket.push_back(std::move(bucket));
         }
         metric.histogram.sample_count = cumulative_count;
@@ -119,6 +113,12 @@ namespace prometheus {
         return metric;
       }
 
+    private:
+      const BucketBoundaries       bucket_boundaries_;
+      std::vector<Counter<Value_>> bucket_counts_;
+      Gauge<Value_>                sum_;
+
+
   };
 
   /// \brief Return a builder to configure and register a Histogram metric.
diff --git a/node/Metrics.cpp b/node/Metrics.cpp
index 7c10540e5..e20f06c32 100644
--- a/node/Metrics.cpp
+++ b/node/Metrics.cpp
@@ -176,6 +176,22 @@ namespace ZeroTier {
         prometheus::simpleapi::counter_family_t network_outgoing_packets
         { "zt_network_outgoing_packets", "number of outgoing packets per network" };
 
+        // PeerMetrics
+        prometheus::CustomFamily<prometheus::Histogram<uint64_t>> &peer_latency = 
+        prometheus::Builder<prometheus::Histogram<uint64_t>>()
+            .Name("zt_peer_latency")
+            .Help("peer latency (ms)")
+            .Register(prometheus::simpleapi::registry);
+    
+        prometheus::simpleapi::gauge_family_t peer_path_count
+        { "zt_peer_path_count", "number of paths to peer" };
+        prometheus::simpleapi::counter_family_t peer_incoming_packets
+        { "zt_peer_incoming_packets", "number of incoming packets from a peer" };
+        prometheus::simpleapi::counter_family_t peer_outgoing_packets
+        { "zt_peer_outgoing_packets", "number of outgoing packets to a peer" };
+        prometheus::simpleapi::counter_family_t peer_packet_errors
+        { "zt_peer_packet_errors" , "number of incoming packet errors from a peer" };
+
         // General Controller Metrics
         prometheus::simpleapi::gauge_metric_t   network_count
         {"controller_network_count", "number of networks the controller is serving"};
diff --git a/node/Metrics.hpp b/node/Metrics.hpp
index a3efcc284..f78a0f157 100644
--- a/node/Metrics.hpp
+++ b/node/Metrics.hpp
@@ -107,6 +107,13 @@ namespace ZeroTier {
         extern prometheus::simpleapi::counter_family_t network_incoming_packets;
         extern prometheus::simpleapi::counter_family_t network_outgoing_packets;
 
+        // Peer Metrics
+        extern prometheus::CustomFamily<prometheus::Histogram<uint64_t>> &peer_latency;
+        extern prometheus::simpleapi::gauge_family_t peer_path_count;
+        extern prometheus::simpleapi::counter_family_t peer_incoming_packets;
+        extern prometheus::simpleapi::counter_family_t peer_outgoing_packets;
+        extern prometheus::simpleapi::counter_family_t peer_packet_errors;
+
         // General Controller Metrics
         extern prometheus::simpleapi::gauge_metric_t   network_count;
         extern prometheus::simpleapi::gauge_metric_t   member_count;
diff --git a/node/Peer.cpp b/node/Peer.cpp
index c46bdf9d2..a08bebbf7 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -28,11 +28,6 @@ namespace ZeroTier {
 
 static unsigned char s_freeRandomByteCounter = 0;
 
-char * peerIDString(const Identity &id) {
-	char out[16];
-	return id.address().toString(out);
-}
-
 Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) :
 	RR(renv),
 	_lastReceive(0),
@@ -55,7 +50,13 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
 	_directPathPushCutoffCount(0),
 	_echoRequestCutoffCount(0),
 	_localMultipathSupported(false),
-	_lastComputedAggregateMeanLatency(0)
+	_lastComputedAggregateMeanLatency(0),
+	_peer_latency{Metrics::peer_latency.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}}, std::vector<uint64_t>{1,3,6,10,30,60,100,300,600,1000})},
+	_alive_path_count{Metrics::peer_path_count.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())},{"status","alive"}})},
+	_dead_path_count{Metrics::peer_path_count.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())},{"status","dead"}})},
+	_incoming_packet{Metrics::peer_incoming_packets.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}})},
+	_outgoing_packet{Metrics::peer_outgoing_packets.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}})},
+	_packet_errors{Metrics::peer_packet_errors.Add({{"node_id", OSUtils::nodeIDStr(peerIdentity.address().toInt())}})}
 {
 	if (!myIdentity.agree(peerIdentity,_key)) {
 		throw ZT_EXCEPTION_INVALID_ARGUMENT;
@@ -96,7 +97,7 @@ void Peer::received(
 		default:
 			break;
 	}
-
+	_incoming_packet++;
 	recordIncomingPacket(path, packetId, payloadLength, verb, flowId, now);
 
 	if (trustEstablished) {
@@ -519,54 +520,70 @@ void Peer::performMultipathStateCheck(void *tPtr, int64_t now)
 unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
 {
 	unsigned int sent = 0;
-	Mutex::Lock _l(_paths_m);
+	{
+		Mutex::Lock _l(_paths_m);
 
-	performMultipathStateCheck(tPtr, now);
+		performMultipathStateCheck(tPtr, now);
 
-	const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
-	if (sendFullHello) {
-		_lastSentFullHello = now;
-	}
-
-	// Right now we only keep pinging links that have the maximum priority. The
-	// priority is used to track cluster redirections, meaning that when a cluster
-	// redirects us its redirect target links override all other links and we
-	// let those old links expire.
-	long maxPriority = 0;
-	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
-		if (_paths[i].p) {
-			maxPriority = std::max(_paths[i].priority,maxPriority);
-		} else {
-			break;
+		const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
+		if (sendFullHello) {
+			_lastSentFullHello = now;
 		}
-	}
 
-	bool deletionOccurred = false;
-	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
-		if (_paths[i].p) {
-			// Clean expired and reduced priority paths
-			if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) {
-				if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
-					attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello);
-					_paths[i].p->sent(now);
-					sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
-				}
+		// Right now we only keep pinging links that have the maximum priority. The
+		// priority is used to track cluster redirections, meaning that when a cluster
+		// redirects us its redirect target links override all other links and we
+		// let those old links expire.
+		long maxPriority = 0;
+		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+			if (_paths[i].p) {
+				maxPriority = std::max(_paths[i].priority,maxPriority);
 			} else {
-				_paths[i] = _PeerPath();
-				deletionOccurred = true;
+				break;
 			}
 		}
-		if (!_paths[i].p || deletionOccurred) {
-			for(unsigned int j=i;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
-				if (_paths[j].p && i != j) {
-					_paths[i] = _paths[j];
-					_paths[j] = _PeerPath();
-					break;
+
+		bool deletionOccurred = false;
+		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+			if (_paths[i].p) {
+				// Clean expired and reduced priority paths
+				if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) {
+					if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
+						attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello);
+						_paths[i].p->sent(now);
+						sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
+					}
+				} else {
+					_paths[i] = _PeerPath();
+					deletionOccurred = true;
 				}
 			}
-			deletionOccurred = false;
+			if (!_paths[i].p || deletionOccurred) {
+				for(unsigned int j=i;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
+					if (_paths[j].p && i != j) {
+						_paths[i] = _paths[j];
+						_paths[j] = _PeerPath();
+						break;
+					}
+				}
+				deletionOccurred = false;
+			}
 		}
+		uint16_t alive_path_count_tmp = 0, dead_path_count_tmp = 0;
+		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+			if (_paths[i].p) {
+				if (_paths[i].p->alive(now)) {
+					alive_path_count_tmp++;
+				}
+				else {
+					dead_path_count_tmp++;
+				}
+			}
+		}
+		_alive_path_count = alive_path_count_tmp;
+		_dead_path_count = dead_path_count_tmp;
 	}
+	_peer_latency.Observe(latency(now));
 	return sent;
 }
 
@@ -641,6 +658,7 @@ void Peer::resetWithinScope(void *tPtr,InetAddress::IpScope scope,int inetAddres
 void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
 	uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
 {
+	_outgoing_packet++;
 	if (_localMultipathSupported && _bond) {
 		_bond->recordOutgoingPacket(path, packetId, payloadLength, verb, flowId, now);
 	}
@@ -648,6 +666,7 @@ void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t pack
 
 void Peer::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
 {
+	_packet_errors++;
 	if (_localMultipathSupported && _bond) {
 		_bond->recordIncomingInvalidPacket(path);
 	}
diff --git a/node/Peer.hpp b/node/Peer.hpp
index 427e78a58..cd6b871fe 100644
--- a/node/Peer.hpp
+++ b/node/Peer.hpp
@@ -598,6 +598,13 @@ private:
 	int32_t _lastComputedAggregateMeanLatency;
 
 	SharedPtr<Bond> _bond;
+
+	prometheus::Histogram<uint64_t> &_peer_latency;
+	prometheus::simpleapi::gauge_metric_t _alive_path_count;
+	prometheus::simpleapi::gauge_metric_t _dead_path_count;
+	prometheus::simpleapi::counter_metric_t _incoming_packet;
+	prometheus::simpleapi::counter_metric_t _outgoing_packet;
+	prometheus::simpleapi::counter_metric_t _packet_errors;
 };
 
 } // namespace ZeroTier