More reorg, wiring through learning our external address by proxy.

This commit is contained in:
Adam Ierymenko 2022-06-30 13:56:55 -04:00
parent 845751cea0
commit be157ccd31
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 84 additions and 45 deletions

View file

@ -48,7 +48,7 @@ pub struct Peer<SI: SystemInterface> {
canonical: CanonicalObject,
// This peer's identity.
pub(crate) identity: Identity,
pub identity: Identity,
// Static shared secret computed from agreement with identity.
identity_symmetric_key: SymmetricSecret,
@ -60,13 +60,14 @@ pub struct Peer<SI: SystemInterface> {
paths: Mutex<Vec<PeerPath<SI>>>,
// Statistics and times of events.
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
pub(crate) last_send_time_ticks: AtomicI64,
pub(crate) last_receive_time_ticks: AtomicI64,
pub(crate) last_hello_reply_time_ticks: AtomicI64,
last_forward_time_ticks: AtomicI64,
create_time_ticks: i64,
pub(crate) last_forward_time_ticks: AtomicI64,
pub(crate) last_incoming_message_id: AtomicU64,
pub(crate) create_time_ticks: i64,
// A random offset added to timestamps sent with HELLO to measure latency avoid advertising the actual tick counter.
// A random offset added to ticks sent to this peer to avoid disclosing the actual tick count.
random_ticks_offset: u64,
// Counter for assigning sequential message IDs.
@ -195,6 +196,7 @@ impl<SI: SystemInterface> Peer<SI> {
last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
last_hello_reply_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
last_incoming_message_id: AtomicU64::new(0),
create_time_ticks: time_ticks,
random_ticks_offset: next_u64_secure(),
message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)),
@ -368,34 +370,45 @@ impl<SI: SystemInterface> Peer<SI> {
/// via a root or some other route.
///
/// It encrypts and sets the MAC and cipher fields and packet ID and other things.
pub(crate) async fn send(&self, si: &SI, node: &Node<SI>, time_ticks: i64, message_id: MessageId, packet: &mut PacketBuffer) -> bool {
if let Some(path) = self.path(node) {
let max_fragment_size = if path.endpoint.requires_fragmentation() { UDP_DEFAULT_MTU } else { usize::MAX };
let flags_cipher_hops = if packet.len() > max_fragment_size { packet_constants::HEADER_FLAG_FRAGMENTED | security_constants::CIPHER_AES_GMAC_SIV } else { security_constants::CIPHER_AES_GMAC_SIV };
let mut aes_gmac_siv = if let Some(ephemeral_key) = self.ephemeral_symmetric_key.read().as_ref() { ephemeral_key.secret.aes_gmac_siv.get() } else { self.identity_symmetric_key.aes_gmac_siv.get() };
aes_gmac_siv.encrypt_init(&message_id.to_ne_bytes());
aes_gmac_siv.encrypt_set_aad(&get_packet_aad_bytes(self.identity.address, node.identity.address, flags_cipher_hops));
if let Ok(payload) = packet.as_bytes_starting_at_mut(packet_constants::HEADER_SIZE) {
aes_gmac_siv.encrypt_first_pass(payload);
aes_gmac_siv.encrypt_first_pass_finish();
aes_gmac_siv.encrypt_second_pass_in_place(payload);
let tag = aes_gmac_siv.encrypt_second_pass_finish();
let header = packet.struct_mut_at::<PacketHeader>(0).unwrap();
header.id = *byte_array_range::<16, 0, 8>(tag);
header.dest = self.identity.address.to_bytes();
header.src = node.identity.address.to_bytes();
header.flags_cipher_hops = flags_cipher_hops;
header.mac = *byte_array_range::<16, 8, 8>(tag);
pub(crate) async fn send(&self, si: &SI, path: Option<&Arc<Path<SI>>>, node: &Node<SI>, time_ticks: i64, message_id: MessageId, packet: &mut PacketBuffer) -> bool {
let mut _path_arc = None;
let path = if let Some(path) = path {
path
} else {
_path_arc = self.path(node);
if let Some(path) = _path_arc.as_ref() {
path
} else {
return false;
}
};
if self.internal_send(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), max_fragment_size, packet).await {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
let max_fragment_size = if path.endpoint.requires_fragmentation() { UDP_DEFAULT_MTU } else { usize::MAX };
let flags_cipher_hops = if packet.len() > max_fragment_size { packet_constants::HEADER_FLAG_FRAGMENTED | security_constants::CIPHER_AES_GMAC_SIV } else { security_constants::CIPHER_AES_GMAC_SIV };
let mut aes_gmac_siv = if let Some(ephemeral_key) = self.ephemeral_symmetric_key.read().as_ref() { ephemeral_key.secret.aes_gmac_siv.get() } else { self.identity_symmetric_key.aes_gmac_siv.get() };
aes_gmac_siv.encrypt_init(&message_id.to_ne_bytes());
aes_gmac_siv.encrypt_set_aad(&get_packet_aad_bytes(self.identity.address, node.identity.address, flags_cipher_hops));
if let Ok(payload) = packet.as_bytes_starting_at_mut(packet_constants::HEADER_SIZE) {
aes_gmac_siv.encrypt_first_pass(payload);
aes_gmac_siv.encrypt_first_pass_finish();
aes_gmac_siv.encrypt_second_pass_in_place(payload);
let tag = aes_gmac_siv.encrypt_second_pass_finish();
let header = packet.struct_mut_at::<PacketHeader>(0).unwrap();
header.id = *byte_array_range::<16, 0, 8>(tag);
header.dest = self.identity.address.to_bytes();
header.src = node.identity.address.to_bytes();
header.flags_cipher_hops = flags_cipher_hops;
header.mac = *byte_array_range::<16, 8, 8>(tag);
} else {
return false;
}
if self.internal_send(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), max_fragment_size, packet).await {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
return false;
}
@ -579,12 +592,8 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
// ---------------------------------------------------------------
// If we made it here it decrypted and passed authentication.
// ---------------------------------------------------------------
if (verb & packet_constants::VERB_FLAG_COMPRESSED) != 0 {
let mut decompressed_payload: [u8; packet_constants::SIZE_MAX] = unsafe { MaybeUninit::uninit().assume_init() };
let mut decompressed_payload = [0_u8; packet_constants::SIZE_MAX];
decompressed_payload[0] = verb;
if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) {
payload.set_to(&decompressed_payload[..(dlen + 1)]);
@ -593,6 +602,10 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
// ---------------------------------------------------------------
// If we made it here it decrypted and passed authentication.
// ---------------------------------------------------------------
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
let mut path_is_known = false;
@ -607,9 +620,9 @@ impl<SI: SystemInterface> Peer<SI> {
verb &= packet_constants::VERB_MASK; // mask off flags
debug_event!(si, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), verbs::name(verb & packet_constants::VERB_MASK), verb as u32);
return match verb {
if match verb {
verbs::VL1_NOP => true,
verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, message_id, source_path, extended_authentication, &payload).await,
verbs::VL1_HELLO => self.handle_incoming_hello(si, ph, node, time_ticks, message_id, source_path, packet_header.hops(), extended_authentication, &payload).await,
verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await,
@ -618,13 +631,23 @@ impl<SI: SystemInterface> Peer<SI> {
verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await,
_ => ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await,
};
} {
// This needs to be saved AFTER processing the packet since some message types may use it to try to filter for replays.
self.last_incoming_message_id.store(message_id, Ordering::Relaxed);
return true;
}
}
}
return false;
}
async fn handle_incoming_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, message_id: MessageId, source_path: &Arc<Path<SI>>, extended_authentication: bool, payload: &PacketBuffer) -> bool {
async fn handle_incoming_hello<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, message_id: MessageId, source_path: &Arc<Path<SI>>, hops: u8, extended_authentication: bool, payload: &PacketBuffer) -> bool {
if !(ph.has_trust_relationship(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
debug_event!(si, "[vl1] dropping HELLO from {} due to lack of trust relationship", self.identity.address.to_string());
return true;
}
let mut cursor = 0;
if let Ok(hello_fixed_headers) = payload.read_struct::<message_component_structs::HelloFixedHeaderFields>(&mut cursor) {
if let Ok(identity) = Identity::unmarshal(payload, &mut cursor) {
@ -679,7 +702,9 @@ impl<SI: SystemInterface> Peer<SI> {
let mut session_metadata = Dictionary::new();
session_metadata.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
session_metadata.set_bytes(session_metadata::CARE_OF, node.care_of_bytes());
session_metadata.set_bytes(session_metadata::SENT_TO, source_path.endpoint.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
if hops == 0 {
session_metadata.set_bytes(session_metadata::DIRECT_SOURCE, source_path.endpoint.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
}
if let Some(my_root_sets) = node.my_root_sets() {
session_metadata.set_bytes(session_metadata::MY_ROOT_SETS, my_root_sets);
}
@ -689,7 +714,7 @@ impl<SI: SystemInterface> Peer<SI> {
assert!(packet.append_bytes(session_metadata.as_slice()).is_ok());
}
return self.send(si, node, time_ticks, self.next_message_id(), &mut packet).await;
return self.send(si, Some(source_path), node, time_ticks, self.next_message_id(), &mut packet).await;
}
}
}
@ -733,15 +758,21 @@ impl<SI: SystemInterface> Peer<SI> {
verbs::VL1_HELLO => {
if let Ok(ok_hello_fixed_header_fields) = payload.read_struct::<message_component_structs::OkHelloFixedHeaderFields>(&mut cursor) {
if ok_hello_fixed_header_fields.version_proto >= 20 {
// V2 nodes send a whole dictionary for easy extensibiility.
if let Ok(session_metadata_len) = payload.read_u16(&mut cursor) {
if session_metadata_len > 0 {
if let Ok(session_metadata) = payload.read_bytes(session_metadata_len as usize, &mut cursor) {
if let Some(session_metadata) = Dictionary::from_bytes(session_metadata) {
let mut remote_node_info = self.remote_node_info.write();
if hops == 0 {
if let Some(reported_endpoint) = session_metadata.get_bytes(session_metadata::SENT_TO) {
if let Some(reported_endpoint) = session_metadata.get_bytes(session_metadata::DIRECT_SOURCE) {
if let Some(reported_endpoint) = Endpoint::from_bytes(reported_endpoint) {
let _ = remote_node_info.reported_local_endpoints.insert(reported_endpoint, time_ticks);
#[cfg(debug_assertions)]
let reported_endpoint2 = reported_endpoint.clone();
if remote_node_info.reported_local_endpoints.insert(reported_endpoint, time_ticks).is_none() {
#[cfg(debug_assertions)]
debug_event!(si, "[vl1] {} reported new remote perspective, local endpoint: {}", self.identity.address.to_string(), reported_endpoint2.to_string());
}
}
}
}
@ -755,9 +786,15 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
} else {
// V1 nodes just append the endpoint to which they sent the packet, and we can still use that.
if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) {
if hops == 0 {
let _ = self.remote_node_info.write().reported_local_endpoints.insert(reported_endpoint, time_ticks);
#[cfg(debug_assertions)]
let reported_endpoint2 = reported_endpoint.clone();
if self.remote_node_info.write().reported_local_endpoints.insert(reported_endpoint, time_ticks).is_none() {
#[cfg(debug_assertions)]
debug_event!(si, "[vl1] {} reported new remote perspective, local endpoint: {}", self.identity.address.to_string(), reported_endpoint2.to_string());
}
}
}
}

View file

@ -47,6 +47,8 @@ use crate::vl1::Address;
pub const PROTOCOL_VERSION: u8 = 20;
/// Minimum peer protocol version supported.
///
/// We could probably push it back to 8 or 9 with some added support for sending Salsa/Poly packets.
pub const PROTOCOL_VERSION_MIN: u8 = 11;
/// Buffer sized for ZeroTier packets.
@ -239,8 +241,8 @@ pub mod session_metadata {
/// Random 128-bit ID generated at node startup, allows multiple instances to share an identity and be differentiated.
pub const INSTANCE_ID: &'static str = "i";
/// Endpoint to which HELLO packet was sent.
pub const SENT_TO: &'static str = "d";
/// Endpoint from which HELLO was received, sent with OK(HELLO) if hops is zero.
pub const DIRECT_SOURCE: &'static str = "s";
/// Signed bundle of identity fingerprints through which this node can be reached.
pub const CARE_OF: &'static str = "c";