From 8681f61de30827b1e9b395246e6440104086182a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 5 Aug 2021 15:49:31 -0400 Subject: [PATCH] Remove potential lock bottleneck from Peer, refactor Pool. --- network-hypervisor/src/util/pool.rs | 83 +++---- network-hypervisor/src/vl1/buffer.rs | 26 ++- network-hypervisor/src/vl1/node.rs | 9 +- network-hypervisor/src/vl1/peer.rs | 309 ++++++++++++--------------- 4 files changed, 199 insertions(+), 228 deletions(-) diff --git a/network-hypervisor/src/util/pool.rs b/network-hypervisor/src/util/pool.rs index fab8b9164..151c90a86 100644 --- a/network-hypervisor/src/util/pool.rs +++ b/network-hypervisor/src/util/pool.rs @@ -4,17 +4,18 @@ use std::sync::{Arc, Weak}; use parking_lot::Mutex; -/// Trait for objects that can be used with Pool. -pub trait Reusable: Default + Sized { - fn reset(&mut self); +/// Trait for objects that create and reset poolable objects. +pub trait PoolFactory { + fn create(&self) -> O; + fn reset(&self, obj: &mut O); } -struct PoolEntry { +struct PoolEntry> { obj: O, - return_pool: Weak>, + return_pool: Weak>, } -type PoolInner = Mutex>>; +struct PoolInner>(F, Mutex>>); /// Container for pooled objects that have been checked out of the pool. /// @@ -26,9 +27,9 @@ type PoolInner = Mutex>>; /// Note that pooled objects are not clonable. If you want to share them use Rc<> /// or Arc<>. #[repr(transparent)] -pub struct Pooled(*mut PoolEntry); +pub struct Pooled>(*mut PoolEntry); -impl Pooled { +impl> Pooled { /// Get a raw pointer to the object wrapped by this pooled object container. /// The returned raw pointer MUST be restored into a Pooled instance with /// from_raw() or memory will leak. @@ -55,7 +56,7 @@ impl Pooled { } } -impl Deref for Pooled { +impl> Deref for Pooled { type Target = O; #[inline(always)] @@ -65,7 +66,7 @@ impl Deref for Pooled { } } -impl AsRef for Pooled { +impl> AsRef for Pooled { #[inline(always)] fn as_ref(&self) -> &O { debug_assert!(!self.0.is_null()); @@ -73,7 +74,7 @@ impl AsRef for Pooled { } } -impl DerefMut for Pooled { +impl> DerefMut for Pooled { #[inline(always)] fn deref_mut(&mut self) -> &mut Self::Target { debug_assert!(!self.0.is_null()); @@ -81,7 +82,7 @@ impl DerefMut for Pooled { } } -impl AsMut for Pooled { +impl> AsMut for Pooled { #[inline(always)] fn as_mut(&mut self) -> &mut O { debug_assert!(!self.0.is_null()); @@ -89,34 +90,34 @@ impl AsMut for Pooled { } } -impl Drop for Pooled { +impl> Drop for Pooled { fn drop(&mut self) { unsafe { Weak::upgrade(&(*self.0).return_pool).map_or_else(|| { drop(Box::from_raw(self.0)) }, |p| { - (*self.0).obj.reset(); - p.lock().push(self.0) + p.0.reset(&mut (*self.0).obj); + p.1.lock().push(self.0) }) } } } /// An object pool for Reusable objects. -/// The pool is safe in that checked out objects return automatically when their Pooled -/// transparent container is dropped, or deallocate if the pool has been dropped. -pub struct Pool(Arc>); +/// Checked out objects are held by a guard object that returns them when dropped if +/// the pool still exists or drops them if the pool has itself been dropped. +pub struct Pool>(Arc>); -impl Pool { - pub fn new(initial_stack_capacity: usize) -> Self { - Self(Arc::new(Mutex::new(Vec::with_capacity(initial_stack_capacity)))) +impl> Pool { + pub fn new(initial_stack_capacity: usize, factory: F) -> Self { + Self(Arc::new(PoolInner::(factory, Mutex::new(Vec::with_capacity(initial_stack_capacity))))) } /// Get a pooled object, or allocate one if the pool is empty. - pub fn get(&self) -> Pooled { - Pooled::(self.0.lock().pop().map_or_else(|| { - Box::into_raw(Box::new(PoolEntry:: { - obj: O::default(), + pub fn get(&self) -> Pooled { + Pooled::(self.0.1.lock().pop().map_or_else(|| { + Box::into_raw(Box::new(PoolEntry:: { + obj: self.0.0.create(), return_pool: Arc::downgrade(&self.0), })) }, |obj| { @@ -128,7 +129,7 @@ impl Pool { /// Get approximate memory use in bytes (does not include checked out objects). #[inline(always)] pub fn pool_memory_bytes(&self) -> usize { - self.0.lock().len() * (size_of::>() + size_of::()) + self.0.1.lock().len() * (size_of::>() + size_of::()) } /// Dispose of all pooled objects, freeing any memory they use. @@ -136,7 +137,7 @@ impl Pool { /// objects will still be returned on drop unless the pool itself is dropped. This can /// be done to free some memory if there has been a spike in memory use. pub fn purge(&self) { - let mut p = self.0.lock(); + let mut p = self.0.1.lock(); for obj in p.iter() { drop(unsafe { Box::from_raw(*obj) }); } @@ -144,15 +145,15 @@ impl Pool { } } -impl Drop for Pool { +impl> Drop for Pool { fn drop(&mut self) { self.purge(); } } -unsafe impl Sync for Pool {} +unsafe impl> Sync for Pool {} -unsafe impl Send for Pool {} +unsafe impl> Send for Pool {} #[cfg(test)] mod tests { @@ -160,24 +161,24 @@ mod tests { use std::ops::DerefMut; use std::time::Duration; - use crate::util::pool::{Reusable, Pool}; + use crate::util::pool::*; use std::sync::Arc; - struct ReusableTestObject(usize); + struct TestPoolFactory; - impl Default for ReusableTestObject { - fn default() -> Self { - Self(0) + impl PoolFactory for TestPoolFactory { + fn create(&self) -> String { + String::new() } - } - impl Reusable for ReusableTestObject { - fn reset(&mut self) {} + fn reset(&self, obj: &mut String) { + obj.clear(); + } } #[test] fn threaded_pool_use() { - let p: Arc> = Arc::new(Pool::new(2)); + let p: Arc> = Arc::new(Pool::new(2, TestPoolFactory{})); let ctr = Arc::new(AtomicUsize::new(0)); for _ in 0..64 { let p2 = p.clone(); @@ -185,10 +186,10 @@ mod tests { let _ = std::thread::spawn(move || { for _ in 0..16384 { let mut o1 = p2.get(); - o1.deref_mut().0 += 1; + o1.push('a'); let mut o2 = p2.get(); drop(o1); - o2.deref_mut().0 += 1; + o2.push('b'); ctr2.fetch_add(1, Ordering::Relaxed); } }); diff --git a/network-hypervisor/src/vl1/buffer.rs b/network-hypervisor/src/vl1/buffer.rs index 6ec29ae44..1b99f5308 100644 --- a/network-hypervisor/src/vl1/buffer.rs +++ b/network-hypervisor/src/vl1/buffer.rs @@ -1,7 +1,6 @@ use std::mem::size_of; use std::io::Write; - -use crate::util::pool::Reusable; +use crate::util::pool::PoolFactory; const OVERFLOW_ERR_MSG: &'static str = "overflow"; @@ -23,13 +22,6 @@ impl Default for Buffer { } } -impl Reusable for Buffer { - #[inline(always)] - fn reset(&mut self) { - self.clear(); - } -} - impl Buffer { #[inline(always)] pub fn new() -> Self { @@ -62,7 +54,7 @@ impl Buffer { /// Get all bytes after a given position. #[inline(always)] - pub fn as_bytes_after(&self, start: usize) -> std::io::Result<&[u8]> { + pub fn as_bytes_starting_at(&self, start: usize) -> std::io::Result<&[u8]> { if start <= self.0 { Ok(&self.1[start..]) } else { @@ -388,3 +380,17 @@ impl AsMut<[u8]> for Buffer { self.as_bytes_mut() } } + +pub struct PooledBufferFactory; + +impl PoolFactory> for PooledBufferFactory { + #[inline(always)] + fn create(&self) -> Buffer { + Buffer::new() + } + + #[inline(always)] + fn reset(&self, obj: &mut Buffer) { + obj.clear(); + } +} diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index ce5bc8d93..94992a5de 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -10,7 +10,7 @@ use crate::error::InvalidParameterError; use crate::util::gate::IntervalGate; use crate::util::pool::{Pool, Pooled}; use crate::vl1::{Address, Endpoint, Identity, Locator}; -use crate::vl1::buffer::Buffer; +use crate::vl1::buffer::{Buffer, PooledBufferFactory}; use crate::vl1::constants::PACKET_SIZE_MAX; use crate::vl1::path::Path; use crate::vl1::peer::Peer; @@ -18,9 +18,10 @@ use crate::vl1::protocol::*; use crate::vl1::whois::WhoisQueue; /// Standard packet buffer type including pool container. -pub type PacketBuffer = Pooled>; +pub type PacketBuffer = Pooled, PooledBufferFactory<{ PACKET_SIZE_MAX }>>; /// Callback interface and call context for calls to the node (for VL1). +/// /// Every non-trivial call takes a reference to this, which it passes all the way through /// the call stack. This can be used to call back into the caller to send packets, get or /// store data, report events, etc. @@ -124,7 +125,7 @@ pub struct Node { paths: DashMap>, peers: DashMap>, whois: WhoisQueue, - buffer_pool: Pool>, + buffer_pool: Pool, PooledBufferFactory<{ PACKET_SIZE_MAX }>>, secure_prng: SecureRandom, } @@ -161,7 +162,7 @@ impl Node { paths: DashMap::new(), peers: DashMap::new(), whois: WhoisQueue::new(), - buffer_pool: Pool::new(64), + buffer_pool: Pool::new(64, PooledBufferFactory), secure_prng: SecureRandom::get(), }) } diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 75abe121c..8265a6b71 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8, Ordering}; use parking_lot::Mutex; use aes_gmac_siv::AesGmacSiv; @@ -10,24 +11,40 @@ use crate::crypto::poly1305::Poly1305; use crate::crypto::random::next_u64_secure; use crate::crypto::salsa::Salsa; use crate::crypto::secret::Secret; +use crate::util::pool::{Pool, PoolFactory}; use crate::vl1::{Identity, Path}; use crate::vl1::buffer::Buffer; use crate::vl1::constants::*; use crate::vl1::node::*; use crate::vl1::protocol::*; -struct PeerSecrets { - // Time secret was created in ticks or -1 for static secrets. +struct AesGmacSivPoolFactory(Secret<48>, Secret<48>); + +impl PoolFactory for AesGmacSivPoolFactory { + #[inline(always)] + fn create(&self) -> AesGmacSiv { + AesGmacSiv::new(self.0.as_ref(), self.1.as_ref()) + } + + #[inline(always)] + fn reset(&self, obj: &mut AesGmacSiv) { + obj.reset(); + } +} + +struct PeerSecret { + // Time secret was created in ticks for ephemeral secrets, or -1 for static secrets. create_time_ticks: i64, - // Number of time secret has been used to encrypt something during this session. - encrypt_count: u64, + // Number of times secret has been used to encrypt something during this session. + encrypt_count: AtomicU64, // Raw secret itself. secret: Secret<48>, - // Reusable AES-GMAC-SIV initialized with secret. - aes: AesGmacSiv, + // Reusable AES-GMAC-SIV ciphers initialized with secret. + // These can't be used concurrently so they're pooled to allow multithreaded use. + aes: Pool, } struct EphemeralKeyPair { @@ -44,50 +61,6 @@ struct EphemeralKeyPair { p521: P521KeyPair, } -struct TxState { - // Time we last sent something to this peer. - last_send_time_ticks: i64, - - // Outgoing packet IV counter, starts at a random position. - packet_iv_counter: u64, - - // Total bytes sent to this peer during this session. - total_bytes: u64, - - // "Eternal" static secret created via identity agreement. - static_secret: PeerSecrets, - - // The most recently negotiated ephemeral secret. - ephemeral_secret: Option, - - // The current ephemeral key pair we will share with HELLO. - ephemeral_pair: Option, - - // Paths to this peer sorted in ascending order of path quality. - paths: Vec>, -} - -struct RxState { - // Time we last received something (authenticated) from this peer. - last_receive_time_ticks: i64, - - // Total bytes received from this peer during this session. - total_bytes: u64, - - // "Eternal" static secret created via identity agreement. - static_secret: PeerSecrets, - - // The most recently negotiated ephemeral secret. - ephemeral_secret: Option, - - // Remote version as major, minor, revision, build in most-to-least-significant 16-bit chunks. - // This is the user-facing software version and is zero if not yet known. - remote_version: u64, - - // Remote protocol version or zero if not yet known. - remote_protocol_version: u8, -} - /// A remote peer known to this node. /// Sending-related and receiving-related fields are locked separately since concurrent /// send/receive is not uncommon. @@ -96,7 +69,7 @@ pub struct Peer { identity: Identity, // Static shared secret computed from agreement with identity. - static_secret: Secret<48>, + static_secret: PeerSecret, // Derived static secret used to encrypt the dictionary part of HELLO. static_secret_hello_dictionary_encrypt: Secret<48>, @@ -104,11 +77,27 @@ pub struct Peer { // Derived static secret used to add full HMAC-SHA384 to packets, currently just HELLO. static_secret_packet_hmac: Secret<48>, - // State used primarily when sending to this peer. - tx: Mutex, + // Latest ephemeral secret acknowledged with OK(HELLO). + ephemeral_secret: Mutex>>, - // State used primarily when receiving from this peer. - rx: Mutex, + // Either None or the current ephemeral key pair whose public keys are on offer. + ephemeral_pair: Mutex>, + + // Statistics + last_send_time_ticks: AtomicI64, + last_receive_time_ticks: AtomicI64, + total_bytes_sent: AtomicU64, + total_bytes_received: AtomicU64, + + // Counter for assigning packet IV's a.k.a. PacketIDs. + packet_iv_counter: AtomicU64, + + // Remote peer version information. + remote_version: AtomicU64, + remote_protocol_version: AtomicU8, + + // Paths sorted in ascending order of quality / preference. + paths: Mutex>>, } /// Derive per-packet key for Sals20/12 encryption (and Poly1305 authentication). @@ -142,42 +131,31 @@ impl Peer { /// fatal error occurs performing key agreement between the two identities. pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option { this_node_identity.agree(&id).map(|static_secret| { - let aes_k0 = zt_kbkdf_hmac_sha384(&static_secret.0, KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0, 0, 0); - let aes_k1 = zt_kbkdf_hmac_sha384(&static_secret.0, KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1, 0, 0); + let aes_factory = AesGmacSivPoolFactory( + zt_kbkdf_hmac_sha384(&static_secret.0, KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0, 0, 0), + zt_kbkdf_hmac_sha384(&static_secret.0, KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1, 0, 0)); let static_secret_hello_dictionary_encrypt = zt_kbkdf_hmac_sha384(&static_secret.0, KBKDF_KEY_USAGE_LABEL_HELLO_DICTIONARY_ENCRYPT, 0, 0); let static_secret_packet_hmac = zt_kbkdf_hmac_sha384(&static_secret.0, KBKDF_KEY_USAGE_LABEL_PACKET_HMAC, 0, 0); Peer { identity: id, - static_secret: static_secret.clone(), + static_secret: PeerSecret { + create_time_ticks: -1, + encrypt_count: AtomicU64::new(0), + secret: static_secret, + aes: Pool::new(4, aes_factory), + }, static_secret_hello_dictionary_encrypt, static_secret_packet_hmac, - tx: Mutex::new(TxState { - last_send_time_ticks: 0, - packet_iv_counter: next_u64_secure(), - total_bytes: 0, - static_secret: PeerSecrets { - create_time_ticks: -1, - encrypt_count: 0, - secret: static_secret.clone(), - aes: AesGmacSiv::new(&aes_k0.0, &aes_k1.0), - }, - ephemeral_secret: None, - paths: Vec::with_capacity(4), - ephemeral_pair: None, - }), - rx: Mutex::new(RxState { - last_receive_time_ticks: 0, - total_bytes: 0, - static_secret: PeerSecrets { - create_time_ticks: -1, - encrypt_count: 0, - secret: static_secret, - aes: AesGmacSiv::new(&aes_k0.0, &aes_k1.0), - }, - ephemeral_secret: None, - remote_version: 0, - remote_protocol_version: 0, - }), + ephemeral_secret: Mutex::new(None), + ephemeral_pair: Mutex::new(None), + last_send_time_ticks: AtomicI64::new(0), + last_receive_time_ticks: AtomicI64::new(0), + total_bytes_sent: AtomicU64::new(0), + total_bytes_received: AtomicU64::new(0), + packet_iv_counter: AtomicU64::new(next_u64_secure()), + remote_version: AtomicU64::new(0), + remote_protocol_version: AtomicU8::new(0), + paths: Mutex::new(Vec::new()), } }) } @@ -185,116 +163,101 @@ impl Peer { /// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. + #[inline(always)] pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { - let packet_frag0_payload_bytes = packet.as_bytes_after(PACKET_VERB_INDEX).unwrap_or(&[]); - if !packet_frag0_payload_bytes.is_empty() { + let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let mut payload: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new(); - let mut rx = self.rx.lock(); + let mut forward_secrecy = true; + let cipher = header.cipher(); - // When handling incoming packets we try any current ephemeral secret first, and if that - // fails we fall back to the static secret. If decryption with an ephemeral secret succeeds - // the forward secrecy flag in the receive path is set. - let forward_secrecy = { - let mut secret = if rx.ephemeral_secret.is_some() { rx.ephemeral_secret.as_mut().unwrap() } else { &mut rx.static_secret }; - loop { - match header.cipher() { - CIPHER_NOCRYPT_POLY1305 => { - // Only HELLO is allowed in the clear (but still authenticated). - if (packet_frag0_payload_bytes[0] & VERB_MASK) == VERB_VL1_HELLO { - let _ = payload.append_bytes(packet_frag0_payload_bytes); - - for f in fragments.iter() { - let _ = f.as_ref().map(|f| { - let _ = f.as_bytes_after(FRAGMENT_HEADER_SIZE).map(|f| { - let _ = payload.append_bytes(f); - }); - }); - } - - // FIPS note: for FIPS purposes the HMAC-SHA384 tag at the end of V2 HELLOs - // will be considered the "real" handshake authentication. - let key = salsa_derive_per_packet_key(&secret.secret, header, payload.len()); - let mut salsa = Salsa::new(&key.0[0..32], header.id_bytes(), true).unwrap(); - let mut poly1305_key = [0_u8; 32]; - salsa.crypt_in_place(&mut poly1305_key); - let mut poly = Poly1305::new(&poly1305_key).unwrap(); - poly.update(packet_frag0_payload_bytes); - - if poly.finish()[0..8].eq(&header.message_auth) { - break; - } + let ephemeral_secret = self.ephemeral_secret.lock().clone(); + for secret in [ephemeral_secret.as_ref().map_or(&self.static_secret, |s| s.as_ref()), &self.static_secret] { + match cipher { + CIPHER_NOCRYPT_POLY1305 => { + if (packet_frag0_payload_bytes[0] & VERB_MASK) == VERB_VL1_HELLO { + let _ = payload.append_bytes(packet_frag0_payload_bytes); + for f in fragments.iter() { + let _ = f.as_ref().map(|f| f.as_bytes_starting_at(FRAGMENT_HEADER_SIZE).map(|f| payload.append_bytes(f))); } - } - CIPHER_SALSA2012_POLY1305 => { - // FIPS note: support for this mode would have to be disabled in FIPS compliant - // modes of operation. + // FIPS note: for FIPS purposes the HMAC-SHA384 tag at the end of V2 HELLOs + // will be considered the "real" handshake authentication. let key = salsa_derive_per_packet_key(&secret.secret, header, payload.len()); let mut salsa = Salsa::new(&key.0[0..32], header.id_bytes(), true).unwrap(); let mut poly1305_key = [0_u8; 32]; salsa.crypt_in_place(&mut poly1305_key); let mut poly = Poly1305::new(&poly1305_key).unwrap(); - - poly.update(packet_frag0_payload_bytes); - let _ = payload.append_and_init_bytes(packet_frag0_payload_bytes.len(), |b| salsa.crypt(packet_frag0_payload_bytes, b)); - for f in fragments.iter() { - let _ = f.as_ref().map(|f| { - let _ = f.as_bytes_after(FRAGMENT_HEADER_SIZE).map(|f| { - poly.update(f); - let _ = payload.append_and_init_bytes(f.len(), |b| salsa.crypt(f, b)); - }); - }); - } + poly.update(payload.as_bytes()); if poly.finish()[0..8].eq(&header.message_auth) { break; } + } else { + // Only HELLO is permitted without payload encryption. Drop other packet types if sent this way. + return; } - - CIPHER_AES_GMAC_SIV => { - secret.aes.reset(); - secret.aes.decrypt_init(&header.aes_gmac_siv_tag()); - secret.aes.decrypt_set_aad(&header.aad_bytes()); - - let _ = payload.append_and_init_bytes(packet_frag0_payload_bytes.len(), |b| secret.aes.decrypt(packet_frag0_payload_bytes, b)); - for f in fragments.iter() { - let _ = f.as_ref().map(|f| { - let _ = f.as_bytes_after(FRAGMENT_HEADER_SIZE).map(|f| { - let _ = payload.append_and_init_bytes(f.len(), |b| secret.aes.decrypt(f, b)); - }); - }); - } - - if secret.aes.decrypt_finish() { - break; - } - } - - _ => {} } - if (secret as *const PeerSecrets) != (&rx.static_secret as *const PeerSecrets) { - payload.clear(); - secret = &mut rx.static_secret; - } else { - // Both ephemeral (if any) and static secret have failed, drop packet. - return; + CIPHER_SALSA2012_POLY1305 => { + let key = salsa_derive_per_packet_key(&secret.secret, header, payload.len()); + let mut salsa = Salsa::new(&key.0[0..32], header.id_bytes(), true).unwrap(); + let mut poly1305_key = [0_u8; 32]; + salsa.crypt_in_place(&mut poly1305_key); + let mut poly = Poly1305::new(&poly1305_key).unwrap(); + + poly.update(packet_frag0_payload_bytes); + let _ = payload.append_and_init_bytes(packet_frag0_payload_bytes.len(), |b| salsa.crypt(packet_frag0_payload_bytes, b)); + for f in fragments.iter() { + let _ = f.as_ref().map(|f| f.as_bytes_starting_at(FRAGMENT_HEADER_SIZE).map(|f| { + poly.update(f); + let _ = payload.append_and_init_bytes(f.len(), |b| salsa.crypt(f, b)); + })); + } + + if poly.finish()[0..8].eq(&header.message_auth) { + break; + } } + + CIPHER_AES_GMAC_SIV => { + let mut aes = secret.aes.get(); + aes.decrypt_init(&header.aes_gmac_siv_tag()); + aes.decrypt_set_aad(&header.aad_bytes()); + + let _ = payload.append_and_init_bytes(packet_frag0_payload_bytes.len(), |b| aes.decrypt(packet_frag0_payload_bytes, b)); + for f in fragments.iter() { + let _ = f.as_ref().map(|f| f.as_bytes_starting_at(FRAGMENT_HEADER_SIZE).map(|f| payload.append_and_init_bytes(f.len(), |b| aes.decrypt(f, b)))); + } + + if aes.decrypt_finish() { + break; + } + } + + _ => {} } - (secret as *const PeerSecrets) != (&(rx.static_secret) as *const PeerSecrets) - }; - // If we make it here we've successfully decrypted and authenticated the packet. + if (secret as *const PeerSecret) == (&self.static_secret as *const PeerSecret) { + // If the static secret failed to authenticate it means we either didn't have an + // ephemeral key or the ephemeral also failed (as it's tried first). + return; + } else { + // If ephemeral failed, static secret will be tried. Set forward secrecy to false. + forward_secrecy = false; + payload.clear(); + } + } + drop(ephemeral_secret); - rx.last_receive_time_ticks = time_ticks; - rx.total_bytes += payload.len() as u64; + // If decryption and authentication succeeded, the code above will break out of the + // for loop and end up here. Otherwise it returns from the whole function. - // Unlock rx state mutex. - drop(rx); + self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); + let _ = self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed); let _ = payload.u8_at(0).map(|verb| { // For performance reasons we let VL2 handle packets first. It returns false - // if it didn't pick up anything. + // if it didn't handle the packet, in which case it's handled at VL1. if !ph.handle_packet(self, source_path, forward_secrecy, verb, &payload) { match verb { VERB_VL1_NOP => {} @@ -311,13 +274,13 @@ impl Peer { } } }); - } + }); } /// Get the remote version of this peer: major, minor, revision, and build. /// Returns None if it's not yet known. pub fn version(&self) -> Option<[u16; 4]> { - let rv = self.rx.lock().remote_version; + let rv = self.remote_version.load(Ordering::Relaxed); if rv != 0 { Some([(rv >> 48) as u16, (rv >> 32) as u16, (rv >> 16) as u16, rv as u16]) } else { @@ -327,7 +290,7 @@ impl Peer { /// Get the remote protocol version of this peer or None if not yet known. pub fn protocol_version(&self) -> Option { - let pv = self.rx.lock().remote_protocol_version; + let pv = self.remote_protocol_version.load(Ordering::Relaxed); if pv != 0 { Some(pv) } else {