From be157ccd314de05697347bef2df92f008e585b33 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 30 Jun 2022 13:56:55 -0400 Subject: [PATCH] More reorg, wiring through learning our external address by proxy. --- zerotier-network-hypervisor/src/vl1/peer.rs | 123 ++++++++++++------ .../src/vl1/protocol.rs | 6 +- 2 files changed, 84 insertions(+), 45 deletions(-) diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 5f2945e82..8341491ae 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -48,7 +48,7 @@ pub struct Peer { canonical: CanonicalObject, // This peer's identity. - pub(crate) identity: Identity, + pub identity: Identity, // Static shared secret computed from agreement with identity. identity_symmetric_key: SymmetricSecret, @@ -60,13 +60,14 @@ pub struct Peer { paths: Mutex>>, // Statistics and times of events. - last_send_time_ticks: AtomicI64, - last_receive_time_ticks: AtomicI64, + pub(crate) last_send_time_ticks: AtomicI64, + pub(crate) last_receive_time_ticks: AtomicI64, pub(crate) last_hello_reply_time_ticks: AtomicI64, - last_forward_time_ticks: AtomicI64, - create_time_ticks: i64, + pub(crate) last_forward_time_ticks: AtomicI64, + pub(crate) last_incoming_message_id: AtomicU64, + pub(crate) create_time_ticks: i64, - // A random offset added to timestamps sent with HELLO to measure latency avoid advertising the actual tick counter. + // A random offset added to ticks sent to this peer to avoid disclosing the actual tick count. random_ticks_offset: u64, // Counter for assigning sequential message IDs. @@ -195,6 +196,7 @@ impl Peer { last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), last_hello_reply_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), + last_incoming_message_id: AtomicU64::new(0), create_time_ticks: time_ticks, random_ticks_offset: next_u64_secure(), message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)), @@ -368,34 +370,45 @@ impl Peer { /// via a root or some other route. /// /// It encrypts and sets the MAC and cipher fields and packet ID and other things. - pub(crate) async fn send(&self, si: &SI, node: &Node, time_ticks: i64, message_id: MessageId, packet: &mut PacketBuffer) -> bool { - if let Some(path) = self.path(node) { - let max_fragment_size = if path.endpoint.requires_fragmentation() { UDP_DEFAULT_MTU } else { usize::MAX }; - let flags_cipher_hops = if packet.len() > max_fragment_size { packet_constants::HEADER_FLAG_FRAGMENTED | security_constants::CIPHER_AES_GMAC_SIV } else { security_constants::CIPHER_AES_GMAC_SIV }; - - let mut aes_gmac_siv = if let Some(ephemeral_key) = self.ephemeral_symmetric_key.read().as_ref() { ephemeral_key.secret.aes_gmac_siv.get() } else { self.identity_symmetric_key.aes_gmac_siv.get() }; - aes_gmac_siv.encrypt_init(&message_id.to_ne_bytes()); - aes_gmac_siv.encrypt_set_aad(&get_packet_aad_bytes(self.identity.address, node.identity.address, flags_cipher_hops)); - if let Ok(payload) = packet.as_bytes_starting_at_mut(packet_constants::HEADER_SIZE) { - aes_gmac_siv.encrypt_first_pass(payload); - aes_gmac_siv.encrypt_first_pass_finish(); - aes_gmac_siv.encrypt_second_pass_in_place(payload); - let tag = aes_gmac_siv.encrypt_second_pass_finish(); - let header = packet.struct_mut_at::(0).unwrap(); - header.id = *byte_array_range::<16, 0, 8>(tag); - header.dest = self.identity.address.to_bytes(); - header.src = node.identity.address.to_bytes(); - header.flags_cipher_hops = flags_cipher_hops; - header.mac = *byte_array_range::<16, 8, 8>(tag); + pub(crate) async fn send(&self, si: &SI, path: Option<&Arc>>, node: &Node, time_ticks: i64, message_id: MessageId, packet: &mut PacketBuffer) -> bool { + let mut _path_arc = None; + let path = if let Some(path) = path { + path + } else { + _path_arc = self.path(node); + if let Some(path) = _path_arc.as_ref() { + path } else { return false; } + }; - if self.internal_send(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), max_fragment_size, packet).await { - self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); - return true; - } + let max_fragment_size = if path.endpoint.requires_fragmentation() { UDP_DEFAULT_MTU } else { usize::MAX }; + let flags_cipher_hops = if packet.len() > max_fragment_size { packet_constants::HEADER_FLAG_FRAGMENTED | security_constants::CIPHER_AES_GMAC_SIV } else { security_constants::CIPHER_AES_GMAC_SIV }; + + let mut aes_gmac_siv = if let Some(ephemeral_key) = self.ephemeral_symmetric_key.read().as_ref() { ephemeral_key.secret.aes_gmac_siv.get() } else { self.identity_symmetric_key.aes_gmac_siv.get() }; + aes_gmac_siv.encrypt_init(&message_id.to_ne_bytes()); + aes_gmac_siv.encrypt_set_aad(&get_packet_aad_bytes(self.identity.address, node.identity.address, flags_cipher_hops)); + if let Ok(payload) = packet.as_bytes_starting_at_mut(packet_constants::HEADER_SIZE) { + aes_gmac_siv.encrypt_first_pass(payload); + aes_gmac_siv.encrypt_first_pass_finish(); + aes_gmac_siv.encrypt_second_pass_in_place(payload); + let tag = aes_gmac_siv.encrypt_second_pass_finish(); + let header = packet.struct_mut_at::(0).unwrap(); + header.id = *byte_array_range::<16, 0, 8>(tag); + header.dest = self.identity.address.to_bytes(); + header.src = node.identity.address.to_bytes(); + header.flags_cipher_hops = flags_cipher_hops; + header.mac = *byte_array_range::<16, 8, 8>(tag); + } else { + return false; } + + if self.internal_send(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), max_fragment_size, packet).await { + self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); + return true; + } + return false; } @@ -579,12 +592,8 @@ impl Peer { } } - // --------------------------------------------------------------- - // If we made it here it decrypted and passed authentication. - // --------------------------------------------------------------- - if (verb & packet_constants::VERB_FLAG_COMPRESSED) != 0 { - let mut decompressed_payload: [u8; packet_constants::SIZE_MAX] = unsafe { MaybeUninit::uninit().assume_init() }; + let mut decompressed_payload = [0_u8; packet_constants::SIZE_MAX]; decompressed_payload[0] = verb; if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) { payload.set_to(&decompressed_payload[..(dlen + 1)]); @@ -593,6 +602,10 @@ impl Peer { } } + // --------------------------------------------------------------- + // If we made it here it decrypted and passed authentication. + // --------------------------------------------------------------- + self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); let mut path_is_known = false; @@ -607,9 +620,9 @@ impl Peer { verb &= packet_constants::VERB_MASK; // mask off flags debug_event!(si, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), verbs::name(verb & packet_constants::VERB_MASK), verb as u32); - return match verb { + if match verb { verbs::VL1_NOP => true, - verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, message_id, source_path, extended_authentication, &payload).await, + verbs::VL1_HELLO => self.handle_incoming_hello(si, ph, node, time_ticks, message_id, source_path, packet_header.hops(), extended_authentication, &payload).await, verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await, verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await, verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await, @@ -618,13 +631,23 @@ impl Peer { verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await, verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await, _ => ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await, - }; + } { + // This needs to be saved AFTER processing the packet since some message types may use it to try to filter for replays. + self.last_incoming_message_id.store(message_id, Ordering::Relaxed); + + return true; + } } } return false; } - async fn handle_incoming_hello(&self, si: &SI, node: &Node, time_ticks: i64, message_id: MessageId, source_path: &Arc>, extended_authentication: bool, payload: &PacketBuffer) -> bool { + async fn handle_incoming_hello(&self, si: &SI, ph: &PH, node: &Node, time_ticks: i64, message_id: MessageId, source_path: &Arc>, hops: u8, extended_authentication: bool, payload: &PacketBuffer) -> bool { + if !(ph.has_trust_relationship(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { + debug_event!(si, "[vl1] dropping HELLO from {} due to lack of trust relationship", self.identity.address.to_string()); + return true; + } + let mut cursor = 0; if let Ok(hello_fixed_headers) = payload.read_struct::(&mut cursor) { if let Ok(identity) = Identity::unmarshal(payload, &mut cursor) { @@ -679,7 +702,9 @@ impl Peer { let mut session_metadata = Dictionary::new(); session_metadata.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec()); session_metadata.set_bytes(session_metadata::CARE_OF, node.care_of_bytes()); - session_metadata.set_bytes(session_metadata::SENT_TO, source_path.endpoint.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec()); + if hops == 0 { + session_metadata.set_bytes(session_metadata::DIRECT_SOURCE, source_path.endpoint.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec()); + } if let Some(my_root_sets) = node.my_root_sets() { session_metadata.set_bytes(session_metadata::MY_ROOT_SETS, my_root_sets); } @@ -689,7 +714,7 @@ impl Peer { assert!(packet.append_bytes(session_metadata.as_slice()).is_ok()); } - return self.send(si, node, time_ticks, self.next_message_id(), &mut packet).await; + return self.send(si, Some(source_path), node, time_ticks, self.next_message_id(), &mut packet).await; } } } @@ -733,15 +758,21 @@ impl Peer { verbs::VL1_HELLO => { if let Ok(ok_hello_fixed_header_fields) = payload.read_struct::(&mut cursor) { if ok_hello_fixed_header_fields.version_proto >= 20 { + // V2 nodes send a whole dictionary for easy extensibiility. if let Ok(session_metadata_len) = payload.read_u16(&mut cursor) { if session_metadata_len > 0 { if let Ok(session_metadata) = payload.read_bytes(session_metadata_len as usize, &mut cursor) { if let Some(session_metadata) = Dictionary::from_bytes(session_metadata) { let mut remote_node_info = self.remote_node_info.write(); if hops == 0 { - if let Some(reported_endpoint) = session_metadata.get_bytes(session_metadata::SENT_TO) { + if let Some(reported_endpoint) = session_metadata.get_bytes(session_metadata::DIRECT_SOURCE) { if let Some(reported_endpoint) = Endpoint::from_bytes(reported_endpoint) { - let _ = remote_node_info.reported_local_endpoints.insert(reported_endpoint, time_ticks); + #[cfg(debug_assertions)] + let reported_endpoint2 = reported_endpoint.clone(); + if remote_node_info.reported_local_endpoints.insert(reported_endpoint, time_ticks).is_none() { + #[cfg(debug_assertions)] + debug_event!(si, "[vl1] {} reported new remote perspective, local endpoint: {}", self.identity.address.to_string(), reported_endpoint2.to_string()); + } } } } @@ -755,9 +786,15 @@ impl Peer { } } } else { + // V1 nodes just append the endpoint to which they sent the packet, and we can still use that. if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { if hops == 0 { - let _ = self.remote_node_info.write().reported_local_endpoints.insert(reported_endpoint, time_ticks); + #[cfg(debug_assertions)] + let reported_endpoint2 = reported_endpoint.clone(); + if self.remote_node_info.write().reported_local_endpoints.insert(reported_endpoint, time_ticks).is_none() { + #[cfg(debug_assertions)] + debug_event!(si, "[vl1] {} reported new remote perspective, local endpoint: {}", self.identity.address.to_string(), reported_endpoint2.to_string()); + } } } } diff --git a/zerotier-network-hypervisor/src/vl1/protocol.rs b/zerotier-network-hypervisor/src/vl1/protocol.rs index 67b3f893a..7389157f8 100644 --- a/zerotier-network-hypervisor/src/vl1/protocol.rs +++ b/zerotier-network-hypervisor/src/vl1/protocol.rs @@ -47,6 +47,8 @@ use crate::vl1::Address; pub const PROTOCOL_VERSION: u8 = 20; /// Minimum peer protocol version supported. +/// +/// We could probably push it back to 8 or 9 with some added support for sending Salsa/Poly packets. pub const PROTOCOL_VERSION_MIN: u8 = 11; /// Buffer sized for ZeroTier packets. @@ -239,8 +241,8 @@ pub mod session_metadata { /// Random 128-bit ID generated at node startup, allows multiple instances to share an identity and be differentiated. pub const INSTANCE_ID: &'static str = "i"; - /// Endpoint to which HELLO packet was sent. - pub const SENT_TO: &'static str = "d"; + /// Endpoint from which HELLO was received, sent with OK(HELLO) if hops is zero. + pub const DIRECT_SOURCE: &'static str = "s"; /// Signed bundle of identity fingerprints through which this node can be reached. pub const CARE_OF: &'static str = "c";