Some reorg.

This commit is contained in:
Adam Ierymenko 2022-06-24 10:52:00 -04:00
parent 1da011b75e
commit d029c34b91
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 143 additions and 135 deletions

View file

@ -55,6 +55,8 @@ pub struct Peer<SI: SystemInterface> {
pub(crate) last_hello_reply_time_ticks: AtomicI64, pub(crate) last_hello_reply_time_ticks: AtomicI64,
last_forward_time_ticks: AtomicI64, last_forward_time_ticks: AtomicI64,
create_time_ticks: i64, create_time_ticks: i64,
// A random offset added to timestamps sent with HELLO to measure latency avoid advertising the actual tick counter.
random_ticks_offset: u64, random_ticks_offset: u64,
// Counter for assigning sequential message IDs. // Counter for assigning sequential message IDs.
@ -148,6 +150,11 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8],
} }
} }
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths<SI: SystemInterface>(paths: &mut Vec<PeerPath<SI>>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
impl<SI: SystemInterface> Peer<SI> { impl<SI: SystemInterface> Peer<SI> {
/// Create a new peer. /// Create a new peer.
/// ///
@ -212,7 +219,7 @@ impl<SI: SystemInterface> Peer<SI> {
return None; return None;
} }
/// Get either the current best direct path or an indirect path. /// Get either the current best direct path or an indirect path via e.g. a root.
pub fn path(&self, node: &Node<SI>) -> Option<Arc<Path<SI>>> { pub fn path(&self, node: &Node<SI>) -> Option<Arc<Path<SI>>> {
let direct_path = self.direct_path(); let direct_path = self.direct_path();
if direct_path.is_some() { if direct_path.is_some() {
@ -224,20 +231,18 @@ impl<SI: SystemInterface> Peer<SI> {
return None; return None;
} }
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths(paths: &mut Vec<PeerPath<SI>>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
pub(crate) fn learn_path(&self, si: &SI, new_path: &Arc<Path<SI>>, time_ticks: i64) { pub(crate) fn learn_path(&self, si: &SI, new_path: &Arc<Path<SI>>, time_ticks: i64) {
let mut paths = self.paths.lock(); let mut paths = self.paths.lock();
// If this is an IpUdp endpoint, scan the existing paths and replace any that come from
// the same IP address but a different port. This prevents the accumulation of duplicate
// paths to the same peer over different ports.
match &new_path.endpoint { match &new_path.endpoint {
Endpoint::IpUdp(new_ip) => { Endpoint::IpUdp(new_ip) => {
// If this is an IpUdp endpoint, scan the existing paths and replace any that come from
// the same IP address but a different port. This prevents the accumulation of duplicate
// paths to the same peer over different ports.
for pi in paths.iter_mut() { for pi in paths.iter_mut() {
if pi.canonical_instance_id == new_path.canonical.canonical_instance_id() {
return;
}
if let Some(p) = pi.path.upgrade() { if let Some(p) = pi.path.upgrade() {
match &p.endpoint { match &p.endpoint {
Endpoint::IpUdp(existing_ip) => { Endpoint::IpUdp(existing_ip) => {
@ -246,7 +251,7 @@ impl<SI: SystemInterface> Peer<SI> {
pi.path = Arc::downgrade(new_path); pi.path = Arc::downgrade(new_path);
pi.canonical_instance_id = new_path.canonical.canonical_instance_id(); pi.canonical_instance_id = new_path.canonical.canonical_instance_id();
pi.last_receive_time_ticks = time_ticks; pi.last_receive_time_ticks = time_ticks;
Self::prioritize_paths(&mut paths); prioritize_paths(&mut paths);
return; return;
} }
} }
@ -255,17 +260,23 @@ impl<SI: SystemInterface> Peer<SI> {
} }
} }
} }
_ => {} _ => {
for pi in paths.iter() {
if pi.canonical_instance_id == new_path.canonical.canonical_instance_id() {
return;
}
}
}
} }
// Otherwise learn new path. // Learn new path if it's not a duplicate or should not replace an existing path.
debug_event!(si, "[vl1] {} learned new path: {}", self.identity.address.to_string(), new_path.endpoint.to_string()); debug_event!(si, "[vl1] {} learned new path: {}", self.identity.address.to_string(), new_path.endpoint.to_string());
paths.push(PeerPath::<SI> { paths.push(PeerPath::<SI> {
path: Arc::downgrade(new_path), path: Arc::downgrade(new_path),
canonical_instance_id: new_path.canonical.canonical_instance_id(), canonical_instance_id: new_path.canonical.canonical_instance_id(),
last_receive_time_ticks: time_ticks, last_receive_time_ticks: time_ticks,
}); });
Self::prioritize_paths(&mut paths); prioritize_paths(&mut paths);
} }
/// Called every SERVICE_INTERVAL_MS by the background service loop in Node. /// Called every SERVICE_INTERVAL_MS by the background service loop in Node.
@ -273,118 +284,11 @@ impl<SI: SystemInterface> Peer<SI> {
{ {
let mut paths = self.paths.lock(); let mut paths = self.paths.lock();
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
Self::prioritize_paths(&mut paths); prioritize_paths(&mut paths);
} }
(time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME
} }
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
///
/// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) async fn receive<PH: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc<Path<SI>>, packet_header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) {
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) {
//let mut payload = unsafe { PacketBuffer::new_without_memzero() };
let mut payload = PacketBuffer::new();
// First try decrypting and authenticating with an ephemeral secret if one is negotiated.
let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_symmetric_key.read().as_ref() {
if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) {
// Decryption successful with ephemeral secret
(true, message_id)
} else {
// Decryption failed with ephemeral secret, which may indicate that it's obsolete.
(false, 0)
}
} else {
// There is no ephemeral secret negotiated (yet?).
(false, 0)
};
// If forward_secrecy is false it means the ephemeral key failed. Try decrypting with the permanent key.
if !forward_secrecy {
payload.clear();
if let Some(message_id2) = try_aead_decrypt(&self.identity_symmetric_key, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) {
// Decryption successful with static secret.
message_id = message_id2;
} else {
// Packet failed to decrypt using either ephemeral or permament key, reject.
debug_event!(si, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id));
return;
}
}
if let Ok(mut verb) = payload.u8_at(0) {
let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0;
if extended_authentication {
if payload.len() >= SHA384_HASH_SIZE {
let actual_end_of_payload = payload.len() - SHA384_HASH_SIZE;
let mut hmac = HMACSHA384::new(self.identity_symmetric_key.packet_hmac_key.as_bytes());
hmac.update(&node.identity.fingerprint);
hmac.update(&self.identity.fingerprint);
hmac.update(&message_id.to_ne_bytes());
hmac.update(&payload.as_bytes()[..actual_end_of_payload]);
if !hmac.finish().eq(&payload.as_bytes()[actual_end_of_payload..]) {
return;
}
payload.set_size(actual_end_of_payload);
} else {
return;
}
}
// ---------------------------------------------------------------
// If we made it here it decrypted and passed authentication.
// ---------------------------------------------------------------
debug_event!(si, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), verbs::name(verb & packet_constants::VERB_MASK), (verb & packet_constants::VERB_MASK) as u32);
if (verb & packet_constants::VERB_FLAG_COMPRESSED) != 0 {
let mut decompressed_payload: [u8; packet_constants::SIZE_MAX] = unsafe { MaybeUninit::uninit().assume_init() };
decompressed_payload[0] = verb;
if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) {
payload.set_to(&decompressed_payload[..(dlen + 1)]);
} else {
return;
}
}
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
let mut path_is_known = false;
for p in self.paths.lock().iter_mut() {
if p.canonical_instance_id == source_path.canonical.canonical_instance_id() {
p.last_receive_time_ticks = time_ticks;
path_is_known = true;
break;
}
}
// For performance reasons we let VL2 handle packets first. It returns false
// if it didn't handle the packet, in which case it's handled at VL1. This is
// because the most performance critical path is the handling of the ???_FRAME
// verbs, which are in VL2.
verb &= packet_constants::VERB_MASK; // mask off flags
if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await {
match verb {
//VERB_VL1_NOP => {}
verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ECHO => self.handle_incoming_echo(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await,
_ => {}
}
}
}
}
}
/// Send to an endpoint, fragmenting if needed. /// Send to an endpoint, fragmenting if needed.
/// ///
/// This does not set the fragmentation field in the packet header, MAC, or encrypt the packet. The sender /// This does not set the fragmentation field in the packet header, MAC, or encrypt the packet. The sender
@ -575,6 +479,113 @@ impl<SI: SystemInterface> Peer<SI> {
} }
} }
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
///
/// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) async fn receive<PH: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc<Path<SI>>, packet_header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) {
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) {
//let mut payload = unsafe { PacketBuffer::new_without_memzero() };
let mut payload = PacketBuffer::new();
// First try decrypting and authenticating with an ephemeral secret if one is negotiated.
let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_symmetric_key.read().as_ref() {
if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) {
// Decryption successful with ephemeral secret
(true, message_id)
} else {
// Decryption failed with ephemeral secret, which may indicate that it's obsolete.
(false, 0)
}
} else {
// There is no ephemeral secret negotiated (yet?).
(false, 0)
};
// If forward_secrecy is false it means the ephemeral key failed. Try decrypting with the permanent key.
if !forward_secrecy {
payload.clear();
if let Some(message_id2) = try_aead_decrypt(&self.identity_symmetric_key, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) {
// Decryption successful with static secret.
message_id = message_id2;
} else {
// Packet failed to decrypt using either ephemeral or permament key, reject.
debug_event!(si, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id));
return;
}
}
if let Ok(mut verb) = payload.u8_at(0) {
let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0;
if extended_authentication {
if payload.len() >= SHA384_HASH_SIZE {
let actual_end_of_payload = payload.len() - SHA384_HASH_SIZE;
let mut hmac = HMACSHA384::new(self.identity_symmetric_key.packet_hmac_key.as_bytes());
hmac.update(&node.identity.fingerprint);
hmac.update(&self.identity.fingerprint);
hmac.update(&message_id.to_ne_bytes());
hmac.update(&payload.as_bytes()[..actual_end_of_payload]);
if !hmac.finish().eq(&payload.as_bytes()[actual_end_of_payload..]) {
return;
}
payload.set_size(actual_end_of_payload);
} else {
return;
}
}
// ---------------------------------------------------------------
// If we made it here it decrypted and passed authentication.
// ---------------------------------------------------------------
debug_event!(si, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), verbs::name(verb & packet_constants::VERB_MASK), (verb & packet_constants::VERB_MASK) as u32);
if (verb & packet_constants::VERB_FLAG_COMPRESSED) != 0 {
let mut decompressed_payload: [u8; packet_constants::SIZE_MAX] = unsafe { MaybeUninit::uninit().assume_init() };
decompressed_payload[0] = verb;
if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) {
payload.set_to(&decompressed_payload[..(dlen + 1)]);
} else {
return;
}
}
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
let mut path_is_known = false;
for p in self.paths.lock().iter_mut() {
if p.canonical_instance_id == source_path.canonical.canonical_instance_id() {
p.last_receive_time_ticks = time_ticks;
path_is_known = true;
break;
}
}
// For performance reasons we let VL2 handle packets first. It returns false
// if it didn't handle the packet, in which case it's handled at VL1. This is
// because the most performance critical path is the handling of the ???_FRAME
// verbs, which are in VL2.
verb &= packet_constants::VERB_MASK; // mask off flags
if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await {
match verb {
//VERB_VL1_NOP => {}
verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ECHO => self.handle_incoming_echo(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await,
_ => {}
}
}
}
}
}
#[allow(unused)] #[allow(unused)]
async fn handle_incoming_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {} async fn handle_incoming_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}

View file

@ -34,7 +34,7 @@ pub struct Service {
struct ServiceImpl { struct ServiceImpl {
pub rt: tokio::runtime::Handle, pub rt: tokio::runtime::Handle,
pub data: DataDir, pub data: DataDir,
pub udp_sockets: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>, pub udp_sockets_by_port: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
pub num_listeners_per_socket: usize, pub num_listeners_per_socket: usize,
_core: Option<NetworkHypervisor<Self>>, _core: Option<NetworkHypervisor<Self>>,
} }
@ -49,7 +49,7 @@ impl Drop for Service {
// This shouldn't have to loop much if at all to acquire the lock, but it might if something // This shouldn't have to loop much if at all to acquire the lock, but it might if something
// is still completing somewhere in an aborting task. // is still completing somewhere in an aborting task.
loop { loop {
if let Ok(mut udp_sockets) = self.internal.udp_sockets.try_write() { if let Ok(mut udp_sockets) = self.internal.udp_sockets_by_port.try_write() {
udp_sockets.clear(); udp_sockets.clear();
break; break;
} }
@ -67,7 +67,7 @@ impl Service {
let mut si = ServiceImpl { let mut si = ServiceImpl {
rt, rt,
data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?, data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?,
udp_sockets: tokio::sync::RwLock::new(HashMap::with_capacity(4)), udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(4)),
num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(), num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(),
_core: None, _core: None,
}; };
@ -93,8 +93,8 @@ impl ServiceImpl {
/// Called in udp_binding_task_main() to service a particular UDP port. /// Called in udp_binding_task_main() to service a particular UDP port.
async fn update_udp_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> { async fn update_udp_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> {
for ns in { for ns in {
let mut udp_sockets = self.udp_sockets.write().await; let mut udp_sockets_by_port = self.udp_sockets_by_port.write().await;
let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port)); let bp = udp_sockets_by_port.entry(port).or_insert_with(|| BoundUdpPort::new(port));
let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist); let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist);
if bp.sockets.is_empty() { if bp.sockets.is_empty() {
return Some(errors); return Some(errors);
@ -196,12 +196,11 @@ impl SystemInterface for ServiceImpl {
} }
} }
// Otherwise we try to send from one socket on every interface or from the specified interface. let udp_sockets_by_port = self.udp_sockets_by_port.read().await;
// This path only happens when the core is trying new endpoints. The fast path is for most packets. if !udp_sockets_by_port.is_empty() {
let sockets = self.udp_sockets.read().await;
if !sockets.is_empty() {
if let Some(specific_interface) = local_interface { if let Some(specific_interface) = local_interface {
for (_, p) in sockets.iter() { // Send from a specific interface if that interface is specified.
for (_, p) in udp_sockets_by_port.iter() {
if !p.sockets.is_empty() { if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() { for _ in 0..p.sockets.len() {
@ -216,11 +215,9 @@ impl SystemInterface for ServiceImpl {
} }
} }
} else { } else {
let bound_ports: Vec<&u16> = sockets.keys().collect(); // Otherwise send from one socket on every interface.
let mut sent_on_interfaces = HashSet::with_capacity(4); let mut sent_on_interfaces = HashSet::with_capacity(4);
let rn = random::xorshift64_random() as usize; for p in udp_sockets_by_port.values() {
for i in 0..bound_ports.len() {
let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap();
if !p.sockets.is_empty() { if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() { for _ in 0..p.sockets.len() {