diff --git a/crypto/src/mimcvdf.rs b/crypto/src/mimcvdf.rs index bfb49921f..030dbff69 100644 --- a/crypto/src/mimcvdf.rs +++ b/crypto/src/mimcvdf.rs @@ -7,7 +7,7 @@ */ /* - * MIMC is a hash function originally designed for use with STARK and SNARK proofs. It's based + * MIMC is a cipher originally designed for use with STARK and SNARK proofs. It's based * on modular multiplication and exponentiation instead of the usual bit twiddling or ARX * operations that underpin more common hash algorithms. * @@ -17,7 +17,8 @@ * compute intensive. The "forward" direction simply requires modular cubing which is two modular * multiplications and is much faster. * - * It's also nice because it's incredibly simple with a tiny code footprint. + * It's a nice VDF because it's incredibly simple with a tiny code footprint. Most other VDFs + * involve RSA group operations or zero knowledge proofs. * * This is used for anti-DOS and anti-spamming delay functions. It's not used for anything * really "cryptographically hard," and if it were broken cryptographically it would still be diff --git a/rustfmt.toml b/rustfmt.toml index 554a221fb..c532918b9 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,5 +1,5 @@ #unstable_features = true -max_width = 140 +max_width = 150 #use_small_heuristics = "Max" edition = "2021" #empty_item_single_line = true diff --git a/zssp/Cargo.toml b/zssp/Cargo.toml index d273a4013..29cc28958 100644 --- a/zssp/Cargo.toml +++ b/zssp/Cargo.toml @@ -24,4 +24,4 @@ doc = false [dependencies] zerotier-utils = { path = "../utils" } zerotier-crypto = { path = "../crypto" } -pqc_kyber = { git = "https://github.com/Argyle-Software/kyber", rev = "8c7927e00f4e3508769bf69afd55b2be1c22884d", features = ["kyber1024", "std"], default-features = false } +pqc_kyber = { version = "0.4.0", default-features = false, features = ["kyber1024", "std"] } diff --git a/zssp/src/error.rs b/zssp/src/error.rs index e416a8954..5fd35c9b4 100644 --- a/zssp/src/error.rs +++ b/zssp/src/error.rs @@ -6,6 +6,7 @@ * https://www.zerotier.com/ */ +#[derive(PartialEq, Eq)] pub enum Error { /// The packet was addressed to an unrecognized local session (should usually be ignored) UnknownLocalSessionId, diff --git a/zssp/src/main.rs b/zssp/src/main.rs index 02b86b6ae..52f84824b 100644 --- a/zssp/src/main.rs +++ b/zssp/src/main.rs @@ -52,6 +52,7 @@ fn alice_main( let mut last_ratchet_count = 0; let test_data = [1u8; TEST_MTU * 10]; let mut up = false; + let mut last_error = zssp::Error::UnknownProtocolVersion; let alice_session = context .open( @@ -100,9 +101,10 @@ fn alice_main( } Ok(zssp::ReceiveResult::Rejected) => {} Err(e) => { - println!("[alice] ERROR {}", e.to_string()); - //run.store(false, Ordering::SeqCst); - //break; + if e != last_error { + println!("[alice] ERROR {}", e.to_string()); + last_error = e; + } } } } @@ -163,6 +165,7 @@ fn bob_main( let mut last_speed_metric = ms_monotonic(); let mut next_service = last_speed_metric + 500; let mut transferred = 0u64; + let mut last_error = zssp::Error::UnknownProtocolVersion; let mut bob_session = None; @@ -207,9 +210,10 @@ fn bob_main( } Ok(zssp::ReceiveResult::Rejected) => {} Err(e) => { - println!("[bob] ERROR {}", e.to_string()); - //run.store(false, Ordering::SeqCst); - //break; + if e != last_error { + println!("[bob] ERROR {}", e.to_string()); + last_error = e; + } } } } diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index b510cdba0..01985c22b 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -7,9 +7,9 @@ */ // ZSSP: ZeroTier Secure Session Protocol -// FIPS compliant Noise_XK with Jedi powers and built-in attack-resistant large payload (fragmentation) support. +// FIPS compliant Noise_XK with Jedi powers (Kyber1024) and built-in attack-resistant large payload (fragmentation) support. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroU64; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -20,8 +20,6 @@ use zerotier_crypto::p384::{P384KeyPair, P384PublicKey, P384_ECDH_SHARED_SECRET_ use zerotier_crypto::secret::Secret; use zerotier_crypto::{random, secure_eq}; -use zerotier_utils::arrayvec::ArrayVec; - use pqc_kyber::{KYBER_SECRETKEYBYTES, KYBER_SSBYTES}; use crate::applicationlayer::ApplicationLayer; @@ -39,7 +37,10 @@ pub struct Context { defrag: Mutex< HashMap< (Application::PhysicalPath, u64), - Arc, i64)>>, + Arc<( + Mutex>, + i64, // creation timestamp + )>, >, >, sessions: RwLock>, @@ -110,7 +111,7 @@ struct OutgoingSessionInit { alice_noise_e_secret: P384KeyPair, noise_es: Secret, alice_hk_secret: Secret, - metadata: Option>, + metadata: Option>, init_packet: [u8; AliceNoiseXKInit::SIZE], } @@ -144,6 +145,8 @@ struct SessionKey { impl Context { /// Create a new session context. + /// + /// * `max_incomplete_session_queue_size` - Maximum number of incomplete sessions in negotiation phase pub fn new(max_incomplete_session_queue_size: usize) -> Self { Self { max_incomplete_session_queue_size, @@ -162,12 +165,7 @@ impl Context { /// * `send` - Function to send packets to remote sessions /// * `mtu` - Physical MTU /// * `current_time` - Current monotonic time in milliseconds - pub fn service>, &mut [u8])>( - &self, - mut send: SendFunction, - mtu: usize, - current_time: i64, - ) -> i64 { + pub fn service>, &mut [u8])>(&self, mut send: SendFunction, mtu: usize, current_time: i64) -> i64 { let mut dead_active = Vec::new(); let mut dead_pending = Vec::new(); let retry_cutoff = current_time - Application::RETRY_INTERVAL; @@ -225,9 +223,7 @@ impl Context { // Check whether we need to rekey if there is no pending offer or if the last rekey // offer was before retry_cutoff (checked in the 'match' above). if let Some(key) = state.keys[state.current_key].as_ref() { - if key.bob - && (current_time >= key.rekey_at_time - || session.send_counter.load(Ordering::Relaxed) >= key.rekey_at_counter) + if key.bob && (current_time >= key.rekey_at_time || session.send_counter.load(Ordering::Relaxed) >= key.rekey_at_counter) { drop(state); session.initiate_rekey(|b| send(&session, b), current_time); @@ -240,7 +236,7 @@ impl Context { } for (id, incoming) in sessions.incoming.iter() { - if incoming.timestamp < negotiation_timeout_cutoff { + if incoming.timestamp <= negotiation_timeout_cutoff { dead_pending.push(*id); } } @@ -256,6 +252,8 @@ impl Context { } } + self.defrag.lock().unwrap().retain(|_, fragged| fragged.1 > negotiation_timeout_cutoff); + Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS.min(Application::RETRY_INTERVAL) } @@ -279,11 +277,11 @@ impl Context { mtu: usize, remote_s_public_p384: &P384PublicKey, psk: Secret, - metadata: Option<&[u8]>, + metadata: Option>, application_data: Application::Data, current_time: i64, ) -> Result>, Error> { - if (metadata.map(|md| md.len()).unwrap_or(0) + app.get_local_s_public_blob().len()) > MAX_INIT_PAYLOAD_SIZE { + if (metadata.as_ref().map(|md| md.len()).unwrap_or(0) + app.get_local_s_public_blob().len()) > MAX_INIT_PAYLOAD_SIZE { return Err(Error::DataTooLarge); } @@ -320,7 +318,7 @@ impl Context { alice_noise_e_secret, noise_es: noise_es.clone(), alice_hk_secret: Secret(alice_hk_secret.secret), - metadata: metadata.map(|md| ArrayVec::try_from(md).unwrap()), + metadata, init_packet: [0u8; AliceNoiseXKInit::SIZE], })), }), @@ -377,8 +375,9 @@ impl Context { /// Receive, authenticate, decrypt, and process a physical wire packet. /// /// The send function may be called one or more times to send packets. If the packet is associated - /// wtth an active session this session is supplied, otherwise this parameter is None. The size - /// of packets to be sent will not exceed the supplied mtu. + /// wtth an active session this session is supplied, otherwise this parameter is None and the packet + /// should be a reply to the current incoming packet. The size of packets to be sent will not exceed + /// the supplied mtu. /// /// The check_allow_incoming_session function is called when an initial Noise_XK init message is /// received. This is before anything is known about the caller. A return value of true proceeds @@ -425,14 +424,7 @@ impl Context { let mut incoming = None; if let Some(local_session_id) = SessionId::new_from_u64_le(u64::from_le_bytes(incoming_packet[0..8].try_into().unwrap())) { - if let Some(session) = self - .sessions - .read() - .unwrap() - .active - .get(&local_session_id) - .and_then(|s| s.upgrade()) - { + if let Some(session) = self.sessions.read().unwrap().active.get(&local_session_id).and_then(|s| s.upgrade()) { debug_assert!(!self.sessions.read().unwrap().incoming.contains_key(&local_session_id)); session @@ -443,8 +435,7 @@ impl Context { if session.check_receive_window(incoming_counter) { if fragment_count > 1 { let mut fragged = session.defrag[(incoming_counter as usize) % COUNTER_WINDOW_MAX_OOO].lock().unwrap(); - if let Some(assembled_packet) = fragged.assemble(incoming_counter, incoming_packet_buf, fragment_no, fragment_count) - { + if let Some(assembled_packet) = fragged.assemble(incoming_counter, incoming_packet_buf, fragment_no, fragment_count) { drop(fragged); return self.process_complete_incoming_packet( app, @@ -500,18 +491,36 @@ impl Context { let (key_index, packet_type, fragment_count, fragment_no, incoming_counter) = parse_packet_header(&incoming_packet); if fragment_count > 1 { - let fragged_m = { + let f = { let mut defrag = self.defrag.lock().unwrap(); - defrag + let f = defrag .entry((source.clone(), incoming_counter)) - .or_insert_with(|| Arc::new(Mutex::new((Fragged::new(), current_time)))) - .clone() + .or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time))) + .clone(); + + // Anti-DOS emergency cleaning of the incoming defragmentation queue for packets not + // associated with known sessions. + if defrag.len() >= self.max_incomplete_session_queue_size { + // First, drop all entries that are timed out or whose physical source duplicates another entry. + let mut sources = HashSet::with_capacity(defrag.len()); + let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; + defrag.retain(|k, fragged| (fragged.1 > negotiation_timeout_cutoff && sources.insert(k.0.clone())) || Arc::ptr_eq(fragged, &f)); + + // Then, if we are still at or over the limit, drop 10% of remaining entries at random. + if defrag.len() >= self.max_incomplete_session_queue_size { + let mut rn = random::next_u32_secure(); + defrag.retain(|_, fragged| { + rn = prng32(rn); + rn > (u32::MAX / 10) || Arc::ptr_eq(fragged, &f) + }); + } + } + + f }; - let mut fragged = fragged_m.lock().unwrap(); - if let Some(assembled_packet) = fragged - .0 - .assemble(incoming_counter, incoming_packet_buf, fragment_no, fragment_count) - { + let mut fragged = f.0.lock().unwrap(); + + if let Some(assembled_packet) = fragged.assemble(incoming_counter, incoming_packet_buf, fragment_no, fragment_count) { self.defrag.lock().unwrap().remove(&(source.clone(), incoming_counter)); return self.process_complete_incoming_packet( app, @@ -610,10 +619,7 @@ impl Context { return Err(Error::DataBufferTooSmall); } let payload_end = last_fragment.len() - AES_GCM_TAG_SIZE; - c.crypt( - &last_fragment[HEADER_SIZE..payload_end], - &mut data_buf[current_frag_data_start..data_len], - ); + c.crypt(&last_fragment[HEADER_SIZE..payload_end], &mut data_buf[current_frag_data_start..data_len]); let aead_authentication_ok = c.finish_decrypt(&last_fragment[payload_end..]); key.return_receive_cipher(c); @@ -639,9 +645,8 @@ impl Context { } else { state.current_key = key_index; } - } else { - state.keys[key_index].as_mut().unwrap().confirmed = true; } + state.keys[key_index].as_mut().unwrap().confirmed = true; // If we got a valid data packet from Bob, this means we can cancel any offers // that are still oustanding for initialization. @@ -656,7 +661,7 @@ impl Context { if packet_type == PACKET_TYPE_DATA { return Ok(ReceiveResult::OkData(session, &mut data_buf[..data_len])); } else { - println!("nop"); + return Ok(ReceiveResult::Ok); } } else { return Err(Error::OutOfSequence); @@ -745,10 +750,7 @@ impl Context { let bob_noise_e = bob_noise_e_secret.public_key_bytes().clone(); let noise_es_ee = Secret(hmac_sha512( noise_es.as_bytes(), - bob_noise_e_secret - .agree(&alice_noise_e) - .ok_or(Error::FailedAuthentication)? - .as_bytes(), + bob_noise_e_secret.agree(&alice_noise_e).ok_or(Error::FailedAuthentication)?.as_bytes(), )); let (bob_hk_ciphertext, hk) = pqc_kyber::encapsulate(&pkt.alice_hk_public, &mut random::SecureRandom::default()) .map_err(|_| Error::FailedAuthentication) @@ -960,8 +962,7 @@ impl Context { // key exchange. Bob won't be able to do this until he decrypts and parses Alice's // identity, so the first HMAC is to let him authenticate that first. let hmac_es_ee_se_hk_psk = hmac_sha384_2( - kbkdf::(noise_es_ee_se_hk_psk.as_bytes()) - .as_bytes(), + kbkdf::(noise_es_ee_se_hk_psk.as_bytes()).as_bytes(), &reply_message_nonce, &reply_buffer[HEADER_SIZE..reply_len], ); @@ -972,14 +973,8 @@ impl Context { { let mut state = session.state.write().unwrap(); let _ = state.remote_session_id.insert(bob_session_id); - let _ = state.keys[0].insert(SessionKey::new::( - noise_es_ee_se_hk_psk, - 1, - current_time, - 2, - false, - false, - )); + let _ = + state.keys[0].insert(SessionKey::new::(noise_es_ee_se_hk_psk, 1, current_time, 2, false, false)); debug_assert!(state.keys[1].is_none()); state.current_key = 0; state.current_offer = Offer::NoiseXKAck(Box::new(OutgoingSessionAck { @@ -1037,8 +1032,7 @@ impl Context { if !secure_eq( &pkt_assembled[auth_start..pkt_assembled.len() - HMAC_SHA384_SIZE], &hmac_sha384_2( - kbkdf::(incoming.noise_es_ee.as_bytes()) - .as_bytes(), + kbkdf::(incoming.noise_es_ee.as_bytes()).as_bytes(), &incoming_message_nonce, &pkt_assembled[HEADER_SIZE..auth_start], ), @@ -1111,8 +1105,7 @@ impl Context { if !secure_eq( &pkt_assembly_buffer_copy[auth_start + HMAC_SHA384_SIZE..pkt_assembled.len()], &hmac_sha384_2( - kbkdf::(noise_es_ee_se_hk_psk.as_bytes()) - .as_bytes(), + kbkdf::(noise_es_ee_se_hk_psk.as_bytes()).as_bytes(), &incoming_message_nonce, &pkt_assembly_buffer_copy[HEADER_SIZE..auth_start + HMAC_SHA384_SIZE], ), @@ -1130,14 +1123,7 @@ impl Context { state: RwLock::new(State { remote_session_id: Some(incoming.alice_session_id), keys: [ - Some(SessionKey::new::( - noise_es_ee_se_hk_psk, - 1, - current_time, - 2, - true, - true, - )), + Some(SessionKey::new::(noise_es_ee_se_hk_psk, 1, current_time, 2, true, true)), None, ], current_key: 0, @@ -1213,9 +1199,9 @@ impl Context { reply_buf[RekeyAck::AUTH_START..].copy_from_slice(&c.finish_encrypt()); key.return_send_cipher(c); - session.header_protection_cipher.encrypt_block_in_place( - &mut reply_buf[HEADER_PROTECT_ENCRYPT_START..HEADER_PROTECT_ENCRYPT_END], - ); + session + .header_protection_cipher + .encrypt_block_in_place(&mut reply_buf[HEADER_PROTECT_ENCRYPT_START..HEADER_PROTECT_ENCRYPT_END]); send(Some(&session), &mut reply_buf); // The new "Bob" doesn't know yet if Alice has received the new key, so the @@ -1323,12 +1309,7 @@ impl Session { /// * `mtu_sized_buffer` - A writable work buffer whose size also specifies the physical MTU /// * `data` - Data to send #[inline] - pub fn send( - &self, - mut send: SendFunction, - mtu_sized_buffer: &mut [u8], - mut data: &[u8], - ) -> Result<(), Error> { + pub fn send(&self, mut send: SendFunction, mtu_sized_buffer: &mut [u8], mut data: &[u8]) -> Result<(), Error> { debug_assert!(mtu_sized_buffer.len() >= MIN_TRANSPORT_MTU); let state = self.state.read().unwrap(); if let Some(remote_session_id) = state.remote_session_id { @@ -1338,8 +1319,7 @@ impl Session { let mut c = session_key.get_send_cipher(counter)?; c.reset_init_gcm(&create_message_nonce(PACKET_TYPE_DATA, counter)); - let fragment_count = - (((data.len() + AES_GCM_TAG_SIZE) as f32) / (mtu_sized_buffer.len() - HEADER_SIZE) as f32).ceil() as usize; + let fragment_count = (((data.len() + AES_GCM_TAG_SIZE) as f32) / (mtu_sized_buffer.len() - HEADER_SIZE) as f32).ceil() as usize; let fragment_max_chunk_size = mtu_sized_buffer.len() - HEADER_SIZE; let last_fragment_no = fragment_count - 1; @@ -1392,15 +1372,9 @@ impl Session { c.reset_init_gcm(&create_message_nonce(PACKET_TYPE_NOP, counter)); nop[HEADER_SIZE..].copy_from_slice(&c.finish_encrypt()); session_key.return_send_cipher(c); - set_packet_header( - &mut nop, - 1, - 0, - PACKET_TYPE_NOP, - u64::from(remote_session_id), - state.current_key, - counter, - ); + set_packet_header(&mut nop, 1, 0, PACKET_TYPE_NOP, u64::from(remote_session_id), state.current_key, counter); + self.header_protection_cipher + .encrypt_block_in_place(&mut nop[HEADER_PROTECT_ENCRYPT_START..HEADER_PROTECT_ENCRYPT_END]); send(&mut nop); } } @@ -1628,8 +1602,7 @@ impl SessionKey { send_cipher_pool: Mutex::new(Vec::with_capacity(2)), rekey_at_time: current_time .checked_add( - Application::REKEY_AFTER_TIME_MS - + ((random::xorshift64_random() as u32) % Application::REKEY_AFTER_TIME_MS_MAX_JITTER) as i64, + Application::REKEY_AFTER_TIME_MS + ((random::xorshift64_random() as u32) % Application::REKEY_AFTER_TIME_MS_MAX_JITTER) as i64, ) .unwrap(), created_at_counter: current_counter, @@ -1715,3 +1688,14 @@ fn kbkdf(key: &[u8]) -> Secret u32 { + // based on lowbias32 from https://nullprogram.com/blog/2018/07/31/ + x = x.wrapping_add(1); // don't get stuck on 0 + x ^= x.wrapping_shr(16); + x = x.wrapping_mul(0x7feb352d); + x ^= x.wrapping_shr(15); + x = x.wrapping_mul(0x846ca68b); + x ^= x.wrapping_shr(16); + x +}