Parse CareOf and refactor some stuff in Peer

This commit is contained in:
Adam Ierymenko 2022-06-29 16:50:38 -04:00
parent 624f2e2946
commit 83aa51d90c
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 113 additions and 51 deletions

View file

@ -1,6 +1,6 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::io::Write; use std::io::{Read, Write};
use crate::vl1::identity::Identity; use crate::vl1::identity::Identity;
use crate::vl1::protocol::IDENTITY_FINGERPRINT_SIZE; use crate::vl1::protocol::IDENTITY_FINGERPRINT_SIZE;
@ -34,7 +34,8 @@ impl CareOf {
for f in self.fingerprints.iter() { for f in self.fingerprints.iter() {
let _ = v.write_all(f); let _ = v.write_all(f);
} }
let _ = varint::write(&mut v, 0); // reserved for future use let _ = varint::write(&mut v, 0); // flags, reserved for future use
let _ = varint::write(&mut v, 0); // extra bytes, reserved for future use
if include_signature { if include_signature {
let _ = varint::write(&mut v, self.signature.len() as u64); let _ = varint::write(&mut v, self.signature.len() as u64);
let _ = v.write_all(self.signature.as_slice()); let _ = v.write_all(self.signature.as_slice());
@ -47,6 +48,40 @@ impl CareOf {
self.to_bytes_internal(true) self.to_bytes_internal(true)
} }
pub fn from_bytes(mut b: &[u8]) -> Option<CareOf> {
let mut f = move || -> std::io::Result<CareOf> {
let (timestamp, _) = varint::read(&mut b)?;
let mut care_of = CareOf {
timestamp: timestamp as i64,
fingerprints: Vec::new(),
signature: Vec::new(),
};
let (fingerprint_count, _) = varint::read(&mut b)?;
for _ in 0..fingerprint_count {
let mut tmp = [0_u8; IDENTITY_FINGERPRINT_SIZE];
b.read_exact(&mut tmp)?;
care_of.fingerprints.push(tmp);
}
let _ = varint::read(&mut b)?; // flags, currently ignored
let (extra_bytes, _) = varint::read(&mut b)?;
if extra_bytes > (b.len() as u64) {
return Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""));
}
b = &b[(extra_bytes as usize)..];
let (signature_len, _) = varint::read(&mut b)?;
if signature_len > (b.len() as u64) {
return Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""));
}
let _ = care_of.signature.write_all(&b[..(signature_len as usize)]);
return Ok(care_of);
};
if let Ok(care_of) = f() {
Some(care_of)
} else {
None
}
}
/// Sort, deduplicate, and sign this care-of packet. /// Sort, deduplicate, and sign this care-of packet.
/// ///
/// The supplied identitiy must contain its secret keys. False is returned if there is an error. /// The supplied identitiy must contain its secret keys. False is returned if there is an error.

View file

@ -3,7 +3,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::mem::MaybeUninit; use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8, Ordering}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
@ -18,6 +18,7 @@ use crate::util::byte_array_range;
use crate::util::canonicalobject::CanonicalObject; use crate::util::canonicalobject::CanonicalObject;
use crate::util::debug_event; use crate::util::debug_event;
use crate::util::marshalable::Marshalable; use crate::util::marshalable::Marshalable;
use crate::vl1::careof::CareOf;
use crate::vl1::node::*; use crate::vl1::node::*;
use crate::vl1::protocol::*; use crate::vl1::protocol::*;
use crate::vl1::symmetricsecret::{EphemeralSymmetricSecret, SymmetricSecret}; use crate::vl1::symmetricsecret::{EphemeralSymmetricSecret, SymmetricSecret};
@ -32,6 +33,14 @@ struct PeerPath<SI: SystemInterface> {
last_receive_time_ticks: i64, last_receive_time_ticks: i64,
} }
struct RemoteNodeInfo {
reported_local_endpoints: HashMap<Endpoint, i64>,
care_of: Option<CareOf>,
remote_version: u64,
hello_extended_authentication: bool,
remote_protocol_version: u8,
}
/// A remote peer known to this node. /// A remote peer known to this node.
/// ///
/// Equality and hashing is implemented in terms of the identity. /// Equality and hashing is implemented in terms of the identity.
@ -50,9 +59,6 @@ pub struct Peer<SI: SystemInterface> {
// Paths sorted in descending order of quality / preference. // Paths sorted in descending order of quality / preference.
paths: Mutex<Vec<PeerPath<SI>>>, paths: Mutex<Vec<PeerPath<SI>>>,
// External addresses by this peer for the local node.
reported_local_endpoints: Mutex<HashMap<Endpoint, i64>>,
// Statistics and times of events. // Statistics and times of events.
last_send_time_ticks: AtomicI64, last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64,
@ -66,9 +72,8 @@ pub struct Peer<SI: SystemInterface> {
// Counter for assigning sequential message IDs. // Counter for assigning sequential message IDs.
message_id_counter: AtomicU64, message_id_counter: AtomicU64,
// Remote peer version information. // Other information reported by remote node.
remote_version: AtomicU64, remote_node_info: RwLock<RemoteNodeInfo>,
remote_protocol_version: AtomicU8,
} }
/// Attempt AEAD packet encryption and MAC validation. Returns message ID on success. /// Attempt AEAD packet encryption and MAC validation. Returns message ID on success.
@ -186,7 +191,6 @@ impl<SI: SystemInterface> Peer<SI> {
identity_symmetric_key: SymmetricSecret::new(static_secret), identity_symmetric_key: SymmetricSecret::new(static_secret),
ephemeral_symmetric_key: RwLock::new(None), ephemeral_symmetric_key: RwLock::new(None),
paths: Mutex::new(Vec::with_capacity(4)), paths: Mutex::new(Vec::with_capacity(4)),
reported_local_endpoints: Mutex::new(HashMap::new()),
last_send_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), last_send_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
@ -194,8 +198,13 @@ impl<SI: SystemInterface> Peer<SI> {
create_time_ticks: time_ticks, create_time_ticks: time_ticks,
random_ticks_offset: next_u64_secure(), random_ticks_offset: next_u64_secure(),
message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)), message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)),
remote_version: AtomicU64::new(0), remote_node_info: RwLock::new(RemoteNodeInfo {
remote_protocol_version: AtomicU8::new(0), reported_local_endpoints: HashMap::new(),
care_of: None,
remote_version: 0,
hello_extended_authentication: false,
remote_protocol_version: 0,
}),
} }
}) })
} }
@ -203,7 +212,7 @@ impl<SI: SystemInterface> Peer<SI> {
/// Get the remote version of this peer: major, minor, revision, and build. /// Get the remote version of this peer: major, minor, revision, and build.
/// Returns None if it's not yet known. /// Returns None if it's not yet known.
pub fn version(&self) -> Option<[u16; 4]> { pub fn version(&self) -> Option<[u16; 4]> {
let rv = self.remote_version.load(Ordering::Relaxed); let rv = self.remote_node_info.read().remote_version;
if rv != 0 { if rv != 0 {
Some([rv.wrapping_shr(48) as u16, rv.wrapping_shr(32) as u16, rv.wrapping_shr(16) as u16, rv as u16]) Some([rv.wrapping_shr(48) as u16, rv.wrapping_shr(32) as u16, rv.wrapping_shr(16) as u16, rv as u16])
} else { } else {
@ -213,7 +222,7 @@ impl<SI: SystemInterface> Peer<SI> {
/// Get the remote protocol version of this peer or None if not yet known. /// Get the remote protocol version of this peer or None if not yet known.
pub fn protocol_version(&self) -> Option<u8> { pub fn protocol_version(&self) -> Option<u8> {
let pv = self.remote_protocol_version.load(Ordering::Relaxed); let pv = self.remote_node_info.read().remote_protocol_version;
if pv != 0 { if pv != 0 {
Some(pv) Some(pv)
} else { } else {
@ -305,7 +314,7 @@ impl<SI: SystemInterface> Peer<SI> {
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
prioritize_paths(&mut paths); prioritize_paths(&mut paths);
} }
self.reported_local_endpoints.lock().retain(|_, ts| (time_ticks - *ts) < PEER_EXPIRATION_TIME); self.remote_node_info.write().reported_local_endpoints.retain(|_, ts| (time_ticks - *ts) < PEER_EXPIRATION_TIME);
(time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME
} }
@ -353,17 +362,6 @@ impl<SI: SystemInterface> Peer<SI> {
} }
} }
fn create_session_metadata(&self, node: &Node<SI>, destination: &Endpoint) -> Vec<u8> {
let mut session_metadata = Dictionary::new();
session_metadata.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
session_metadata.set_bytes(session_metadata::CARE_OF, node.care_of_bytes());
session_metadata.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
if let Some(my_root_sets) = node.my_root_sets() {
session_metadata.set_bytes(session_metadata::MY_ROOT_SETS, my_root_sets);
}
session_metadata.to_bytes()
}
/// Send a packet to this peer, returning true on (potential) success. /// Send a packet to this peer, returning true on (potential) success.
/// ///
/// This will go directly if there is an active path, or otherwise indirectly /// This will go directly if there is an active path, or otherwise indirectly
@ -466,7 +464,8 @@ impl<SI: SystemInterface> Peer<SI> {
assert!(node.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok()); assert!(node.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok());
// Create session meta-data and append length of this section. // Create session meta-data and append length of this section.
let session_metadata = self.create_session_metadata(node, destination); let session_metadata = Dictionary::new();
let session_metadata = session_metadata.to_bytes();
let session_metadata_len = session_metadata.len() + 16; // plus nonce let session_metadata_len = session_metadata.len() + 16; // plus nonce
assert!(session_metadata_len <= 0xffff); // sanity check, should be impossible assert!(session_metadata_len <= 0xffff); // sanity check, should be impossible
assert!(packet.append_u16(session_metadata_len as u16).is_ok()); assert!(packet.append_u16(session_metadata_len as u16).is_ok());
@ -610,7 +609,7 @@ impl<SI: SystemInterface> Peer<SI> {
return match verb { return match verb {
verbs::VL1_NOP => true, verbs::VL1_NOP => true,
verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, message_id, source_path, &payload).await, verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, message_id, source_path, extended_authentication, &payload).await,
verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await, verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await, verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await, verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await,
@ -625,14 +624,19 @@ impl<SI: SystemInterface> Peer<SI> {
return false; return false;
} }
async fn handle_incoming_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, message_id: MessageId, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) -> bool { async fn handle_incoming_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, message_id: MessageId, source_path: &Arc<Path<SI>>, extended_authentication: bool, payload: &PacketBuffer) -> bool {
let mut cursor = 0; let mut cursor = 0;
if let Ok(hello_fixed_headers) = payload.read_struct::<message_component_structs::HelloFixedHeaderFields>(&mut cursor) { if let Ok(hello_fixed_headers) = payload.read_struct::<message_component_structs::HelloFixedHeaderFields>(&mut cursor) {
self.remote_protocol_version.store(hello_fixed_headers.version_proto, Ordering::Relaxed);
self.remote_version
.store((hello_fixed_headers.version_major as u64).wrapping_shl(48) | (hello_fixed_headers.version_minor as u64).wrapping_shl(32) | (u16::from_be_bytes(hello_fixed_headers.version_revision) as u64).wrapping_shl(16), Ordering::Relaxed);
if let Ok(identity) = Identity::unmarshal(payload, &mut cursor) { if let Ok(identity) = Identity::unmarshal(payload, &mut cursor) {
if identity.eq(&self.identity) { if identity.eq(&self.identity) {
{
let mut remote_node_info = self.remote_node_info.write();
remote_node_info.remote_protocol_version = hello_fixed_headers.version_proto;
remote_node_info.hello_extended_authentication = extended_authentication;
remote_node_info.remote_version =
(hello_fixed_headers.version_major as u64).wrapping_shl(48) | (hello_fixed_headers.version_minor as u64).wrapping_shl(32) | (u16::from_be_bytes(hello_fixed_headers.version_revision) as u64).wrapping_shl(16);
if hello_fixed_headers.version_proto >= 20 { if hello_fixed_headers.version_proto >= 20 {
let mut session_metadata_len = payload.read_u16(&mut cursor).unwrap_or(0) as usize; let mut session_metadata_len = payload.read_u16(&mut cursor).unwrap_or(0) as usize;
if session_metadata_len > 16 { if session_metadata_len > 16 {
@ -655,6 +659,7 @@ impl<SI: SystemInterface> Peer<SI> {
} }
} }
} }
}
let mut packet = PacketBuffer::new(); let mut packet = PacketBuffer::new();
packet.set_size(packet_constants::HEADER_SIZE); packet.set_size(packet_constants::HEADER_SIZE);
@ -671,7 +676,14 @@ impl<SI: SystemInterface> Peer<SI> {
} }
if hello_fixed_headers.version_proto >= 20 { if hello_fixed_headers.version_proto >= 20 {
let session_metadata = self.create_session_metadata(node, &source_path.endpoint); let mut session_metadata = Dictionary::new();
session_metadata.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
session_metadata.set_bytes(session_metadata::CARE_OF, node.care_of_bytes());
session_metadata.set_bytes(session_metadata::SENT_TO, source_path.endpoint.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
if let Some(my_root_sets) = node.my_root_sets() {
session_metadata.set_bytes(session_metadata::MY_ROOT_SETS, my_root_sets);
}
let session_metadata = session_metadata.to_bytes();
assert!(session_metadata.len() <= 0xffff); // sanity check, should be impossible assert!(session_metadata.len() <= 0xffff); // sanity check, should be impossible
assert!(packet.append_u16(session_metadata.len() as u16).is_ok()); assert!(packet.append_u16(session_metadata.len() as u16).is_ok());
assert!(packet.append_bytes(session_metadata.as_slice()).is_ok()); assert!(packet.append_bytes(session_metadata.as_slice()).is_ok());
@ -724,21 +736,36 @@ impl<SI: SystemInterface> Peer<SI> {
if let Ok(session_metadata_len) = payload.read_u16(&mut cursor) { if let Ok(session_metadata_len) = payload.read_u16(&mut cursor) {
if session_metadata_len > 0 { if session_metadata_len > 0 {
if let Ok(session_metadata) = payload.read_bytes(session_metadata_len as usize, &mut cursor) { if let Ok(session_metadata) = payload.read_bytes(session_metadata_len as usize, &mut cursor) {
if let Some(_session_metadata) = Dictionary::from_bytes(session_metadata) { if let Some(session_metadata) = Dictionary::from_bytes(session_metadata) {
// TODO let mut remote_node_info = self.remote_node_info.write();
if hops == 0 {
if let Some(reported_endpoint) = session_metadata.get_bytes(session_metadata::SENT_TO) {
if let Some(reported_endpoint) = Endpoint::from_bytes(reported_endpoint) {
let _ = remote_node_info.reported_local_endpoints.insert(reported_endpoint, time_ticks);
}
}
}
if let Some(care_of) = session_metadata.get_bytes(session_metadata::CARE_OF) {
if let Some(care_of) = CareOf::from_bytes(care_of) {
let _ = remote_node_info.care_of.insert(care_of);
}
}
} }
} }
} }
} }
} else { } else {
if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) {
let _ = self.reported_local_endpoints.lock().insert(reported_endpoint, time_ticks); if hops == 0 {
let _ = self.remote_node_info.write().reported_local_endpoints.insert(reported_endpoint, time_ticks);
}
} }
} }
if hops == 0 && !path_is_known { if hops == 0 && !path_is_known {
self.learn_path(si, source_path, time_ticks); self.learn_path(si, source_path, time_ticks);
} }
self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed);
} }
} }