More cleanup of incorporation of async.

This commit is contained in:
Adam Ierymenko 2022-06-16 13:53:43 -04:00
parent 6ad30ba1f5
commit 564df37c1d
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
6 changed files with 261 additions and 231 deletions

View file

@ -117,6 +117,14 @@ impl Endpoint {
None
}
}
/// Returns true if this is an endpoint type that requires that large packets be fragmented.
pub fn requires_fragmentation(&self) -> bool {
match self {
Endpoint::Ip(_) | Endpoint::IpUdp(_) | Endpoint::Ethernet(_) | Endpoint::Bluetooth(_) | Endpoint::WifiDirect(_) => true,
_ => false,
}
}
}
impl Marshalable for Endpoint {

View file

@ -276,33 +276,25 @@ impl Identity {
/// Sign a message with this identity.
///
/// If legacy_ed25519_only is true this generates only an ed25519 signature and uses the old
/// format that also includes part of the plaintext hash at the end. The include_algorithms mask
/// will be ignored. Otherwise it will generate a signature for every algorithm with a secret
/// in this identity and that is specified in the include_algorithms bit mask.
/// Identities with P-384 keys sign with that unless legacy_ed25519_only is selected. If this is
/// set the old 96-byte signature plus hash format used in ZeroTier v1 is used.
///
/// A return of None happens if we don't have our secret key(s) or some other error occurs.
pub fn sign(&self, msg: &[u8], include_algorithms: u8, legacy_ed25519_only: bool) -> Option<Vec<u8>> {
pub fn sign(&self, msg: &[u8], legacy_ed25519_only: bool) -> Option<Vec<u8>> {
if self.secret.is_some() {
let secret = self.secret.as_ref().unwrap();
if legacy_ed25519_only {
Some(secret.ed25519.sign_zt(msg).to_vec())
} else if let Some(p384s) = secret.p384.as_ref() {
let mut tmp: Vec<u8> = Vec::with_capacity(1 + P384_ECDSA_SIGNATURE_SIZE);
tmp.push(Self::ALGORITHM_EC_NIST_P384);
let _ = tmp.write_all(&p384s.ecdsa.sign(msg));
Some(tmp)
} else {
let mut tmp: Vec<u8> = Vec::with_capacity(1 + P384_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE);
tmp.push(0);
if secret.p384.is_some() && (include_algorithms & Self::ALGORITHM_EC_NIST_P384) != 0 {
*tmp.first_mut().unwrap() |= Self::ALGORITHM_EC_NIST_P384;
let _ = tmp.write_all(&secret.p384.as_ref().unwrap().ecdsa.sign(msg));
}
if (include_algorithms & Self::ALGORITHM_X25519) != 0 {
*tmp.first_mut().unwrap() |= Self::ALGORITHM_X25519;
let _ = tmp.write_all(&secret.ed25519.sign(msg));
}
if tmp.len() > 1 {
Some(tmp)
} else {
None
}
let mut tmp: Vec<u8> = Vec::with_capacity(1 + ED25519_SIGNATURE_SIZE);
tmp.push(Self::ALGORITHM_X25519);
let _ = tmp.write_all(&secret.ed25519.sign(msg));
Some(tmp)
}
} else {
None
@ -313,34 +305,21 @@ impl Identity {
pub fn verify(&self, msg: &[u8], mut signature: &[u8]) -> bool {
if signature.len() == 96 {
// legacy ed25519-only signature with hash included detected by their unique size.
return ed25519_verify(&self.ed25519, signature, msg);
} else if signature.len() > 1 {
// Otherwise we support compound signatures. Note that it's possible for there to be
// unknown algorithms here if we ever add e.g. a PQ signature scheme and older nodes
// don't support it, and therefore it's valid if all algorithms that are present and
// understood pass signature check. The 'passed' variable makes sure we can't pass without
// verifying at least one signature. If any present and understood algorithm fails the
// whole check fails, so you can't have one good and one bad signature.
let algorithms = signature[0];
signature = &signature[1..];
let mut passed = false; // makes sure we can't pass with an empty signature!
if (algorithms & Self::ALGORITHM_EC_NIST_P384) != 0 && signature.len() >= P384_ECDSA_SIGNATURE_SIZE && self.p384.is_some() {
if !self.p384.as_ref().unwrap().ecdsa.verify(msg, &signature[..P384_ECDSA_SIGNATURE_SIZE]) {
return false;
ed25519_verify(&self.ed25519, &signature[..64], msg)
} else if let Some(algorithm) = signature.get(0) {
if *algorithm == Self::ALGORITHM_EC_NIST_P384 && signature.len() == (1 + P384_ECDSA_SIGNATURE_SIZE) {
if let Some(p384) = self.p384.as_ref() {
p384.ecdsa.verify(msg, &signature[1..])
} else {
false
}
signature = &signature[P384_ECDSA_SIGNATURE_SIZE..];
passed = true;
} else if *algorithm == Self::ALGORITHM_X25519 && signature.len() == (1 + ED25519_SIGNATURE_SIZE) {
ed25519_verify(&self.ed25519, &signature[1..], msg)
} else {
false
}
if (algorithms & Self::ALGORITHM_X25519) != 0 && signature.len() >= ED25519_SIGNATURE_SIZE {
if !ed25519_verify(&self.ed25519, &signature[..ED25519_SIGNATURE_SIZE], msg) {
return false;
}
//signature = &signature[ED25519_SIGNATURE_SIZE..];
passed = true;
}
return passed;
} else {
return false;
false
}
}
@ -371,10 +350,11 @@ impl Identity {
}
/*
* For legacy backward compatibility, any key pairs and other material after the x25519
* keys are prefixed by 0x03 followed by the number of remaining bytes. This allows old nodes
* to parse HELLO normally and ignore the rest of the extended identity. It's ignored by
* newer nodes.
* The prefix of 0x03 is for backward compatibility. Older nodes will interpret this as
* an empty unidentified InetAddress object and will skip the number of bytes following it.
*
* For future compatibility the size field here will allow this to be extended, something
* that should have been in the protocol from the beginning.
*/
buf.append_u8(0x03)?;
let remaining_data_size_field_at = buf.len();
@ -400,10 +380,8 @@ impl Identity {
}
}
buf.append_u8(0xff)?;
// Fill in the remaining data field earmarked above.
*buf.bytes_fixed_mut_at(remaining_data_size_field_at).unwrap() = ((buf.len() - remaining_data_size_field_at) as u16).to_be_bytes();
*buf.bytes_fixed_mut_at(remaining_data_size_field_at).unwrap() = (((buf.len() - remaining_data_size_field_at) - 2) as u16).to_be_bytes();
Ok(())
}
@ -632,13 +610,11 @@ impl Marshalable for Identity {
let mut p384_ecdh_ecdsa_public: Option<(P384PublicKey, P384PublicKey, &[u8; P384_ECDSA_SIGNATURE_SIZE], &[u8; ED25519_SIGNATURE_SIZE])> = None;
let mut p384_ecdh_ecdsa_secret: Option<(&[u8; P384_SECRET_KEY_SIZE], &[u8; P384_SECRET_KEY_SIZE])> = None;
loop {
let mut algorithm = buf.read_u8(cursor)?;
if algorithm == 0 {
algorithm = Self::ALGORITHM_X25519;
}
let mut eof = buf.len();
while *cursor < eof {
let algorithm = buf.read_u8(cursor)?;
match algorithm {
Self::ALGORITHM_X25519 => {
0_u8 | Self::ALGORITHM_X25519 => {
let a = buf.read_bytes_fixed::<C25519_PUBLIC_KEY_SIZE>(cursor)?;
let b = buf.read_bytes_fixed::<ED25519_PUBLIC_KEY_SIZE>(cursor)?;
x25519_public = Some((a, b));
@ -655,7 +631,8 @@ impl Marshalable for Identity {
// This isn't an algorithm; each algorithm is identified by just one bit. This
// indicates the total size of the section after the x25519 keys for backward
// compatibility. See comments in marshal(). New versions can ignore this field.
*cursor += 2;
let bytes_remaining = buf.read_u16(cursor)? as usize;
eof = *cursor + bytes_remaining;
}
Self::ALGORITHM_EC_NIST_P384 => {
let field_length = buf.read_varint(cursor)?;
@ -684,7 +661,6 @@ impl Marshalable for Identity {
p384_ecdh_ecdsa_secret = Some((a, b));
}
}
0xff => break,
_ => {
*cursor += buf.read_varint(cursor)? as usize;
}

View file

@ -1,6 +1,6 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -111,7 +111,7 @@ pub trait InnerProtocolInterface: Sync + Send + 'static {
/// Check if this remote peer has a trust relationship with this node.
///
/// This is checked to determine if we should do things like make direct links ore respond to
/// This is checked to determine if we should do things like make direct links or respond to
/// various other VL1 messages.
fn has_trust_relationship(&self, id: &Identity) -> bool;
}
@ -421,9 +421,10 @@ impl<SI: SystemInterface> Node<SI> {
for dp in dead_paths.iter() {
self.paths.write().remove(dp);
}
let z = [&crate::util::ZEROES[..1]];
let ka = [tt as u8]; // send different bytes every time for keepalive in case some things filter zero packets
let ka2 = [&ka[..1]];
for ka in need_keepalive.iter() {
si.wire_send(&ka.endpoint, Some(&ka.local_socket), Some(&ka.local_interface), &z, 0).await;
si.wire_send(&ka.endpoint, Some(&ka.local_socket), Some(&ka.local_interface), &ka2, 0).await;
}
}
@ -513,14 +514,10 @@ impl<SI: SystemInterface> Node<SI> {
}
}
pub fn root(&self) -> Option<Arc<Peer<SI>>> {
pub fn best_root(&self) -> Option<Arc<Peer<SI>>> {
self.best_root.read().clone()
}
pub fn is_peer_root(&self, peer: &Peer<SI>) -> bool {
self.roots.read().roots.contains_key(peer)
}
pub fn add_update_root_set(&self, rs: RootSet) -> bool {
let mut roots = self.roots.write();
if let Some(entry) = roots.sets.get_mut(&rs.name) {

View file

@ -166,21 +166,6 @@ impl<SI: SystemInterface> Peer<SI> {
/// fatal error occurs performing key agreement between the two identities.
pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_clock: i64) -> Option<Peer<SI>> {
this_node_identity.agree(&id).map(|static_secret| -> Self {
/*
* SECURITY NOTE:
*
* The message ID counter is initialized from the number of minutes since the Unix epoch (according to
* the current clock) in the most significant 26 bits followed by two zero bits followed by 36 random
* bits.
*
* The nature of AES-GMAC-SIV means that message ID duplication is not particularly dangerous, but we
* still want to avoid it. If the clock is at least marginally correct this will mean that message IDs
* will remain unique for over a hundred years. Message IDs are kept secret as well because they are
* encrypted along with a GMAC code to form an opaque 128-bit packet tag.
*
* Keep in mind that we re-key (when talking to new nodes) so not only are duplicate message IDs not
* particularly dangerous in SIV but they'd have to occur while using the same key.
*/
Self {
identity: id,
identity_symmetric_key: SymmetricSecret::new(static_secret),
@ -190,7 +175,7 @@ impl<SI: SystemInterface> Peer<SI> {
last_receive_time_ticks: AtomicI64::new(0),
last_forward_time_ticks: AtomicI64::new(0),
last_hello_reply_time_ticks: AtomicI64::new(0),
message_id_counter: AtomicU64::new(((time_clock as u64) / 60000).wrapping_shl(38) ^ next_u64_secure().wrapping_shr(28)),
message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)),
remote_version: AtomicU64::new(0),
remote_protocol_version: AtomicU8::new(0),
}
@ -203,19 +188,82 @@ impl<SI: SystemInterface> Peer<SI> {
self.message_id_counter.fetch_add(1, Ordering::SeqCst)
}
/// Get current best path or None if there are no direct paths to this peer.
pub fn direct_path(&self) -> Option<Arc<Path<SI>>> {
for p in self.paths.lock().iter() {
let pp = p.path.upgrade();
if pp.is_some() {
return pp;
}
}
return None;
}
/// Get either the current best direct path or an indirect path.
pub fn path(&self, node: &Node<SI>) -> Option<Arc<Path<SI>>> {
let direct_path = self.direct_path();
if direct_path.is_some() {
return direct_path;
}
if let Some(root) = node.best_root() {
return root.direct_path();
}
return None;
}
/// Get the remote version of this peer: major, minor, revision, and build.
/// Returns None if it's not yet known.
pub fn version(&self) -> Option<[u16; 4]> {
let rv = self.remote_version.load(Ordering::Relaxed);
if rv != 0 {
Some([(rv >> 48) as u16, (rv >> 32) as u16, (rv >> 16) as u16, rv as u16])
} else {
None
}
}
/// Get the remote protocol version of this peer or None if not yet known.
pub fn protocol_version(&self) -> Option<u8> {
let pv = self.remote_protocol_version.load(Ordering::Relaxed);
if pv != 0 {
Some(pv)
} else {
None
}
}
/// Sort a list of paths by quality or priority, with best paths first.
fn prioritize_paths(paths: &mut Vec<PeerPath<SI>>) {
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
}
/// Called every SERVICE_INTERVAL_MS by the background service loop in Node.
pub(crate) fn service(&self, _: &SI, _: &Node<SI>, time_ticks: i64) -> bool {
{
let mut paths = self.paths.lock();
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
Self::prioritize_paths(&mut paths);
}
if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PEER_EXPIRATION_TIME {
true
} else {
false
}
}
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
///
/// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk.
pub(crate) async fn receive<PH: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc<Path<SI>>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) {
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) {
let mut payload = unsafe { PacketBuffer::new_without_memzero() };
//let mut payload = unsafe { PacketBuffer::new_without_memzero() };
let mut payload = PacketBuffer::new();
// First try decrypting and authenticating with an ephemeral secret if one is negotiated.
let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_symmetric_key.read().as_ref() {
if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload) {
// Decryption successful with ephemeral secret
ephemeral_secret.decrypt_uses.fetch_add(1, Ordering::Relaxed);
(true, message_id)
} else {
// Decryption failed with ephemeral secret, which may indicate that it's obsolete.
@ -226,7 +274,7 @@ impl<SI: SystemInterface> Peer<SI> {
(false, 0)
};
// Then try the permanent secret.
// If forward_secrecy is false it means the ephemeral key failed. Try decrypting with the permanent key.
if !forward_secrecy {
if let Some(message_id2) = try_aead_decrypt(&self.identity_symmetric_key, packet_frag0_payload_bytes, header, fragments, &mut payload) {
// Decryption successful with static secret.
@ -245,6 +293,8 @@ impl<SI: SystemInterface> Peer<SI> {
if payload.len() >= SHA512_HASH_SIZE {
let actual_end_of_payload = payload.len() - SHA512_HASH_SIZE;
let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes());
hmac.update(&node.identity.fingerprint);
hmac.update(&self.identity.fingerprint);
hmac.update(&message_id.to_ne_bytes());
hmac.update(&payload.as_bytes()[..actual_end_of_payload]);
if !hmac.finish().eq(&payload.as_bytes()[actual_end_of_payload..]) {
@ -270,9 +320,11 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
let mut path_is_known = false;
for p in self.paths.lock().iter_mut() {
if p.path_internal_instance_id == source_path.internal_instance_id {
p.last_receive_time_ticks = time_ticks;
path_is_known = true;
break;
}
}
@ -285,14 +337,14 @@ impl<SI: SystemInterface> Peer<SI> {
if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await {
match verb {
//VERB_VL1_NOP => {}
verbs::VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ERROR => self.receive_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_OK => self.receive_ok(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_PUSH_DIRECT_PATHS => self.receive_push_direct_paths(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_USER_MESSAGE => self.receive_user_message(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, path_is_known, forward_secrecy, extended_authentication, &payload).await,
verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_ECHO => self.handle_incoming_echo(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await,
verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await,
_ => {}
}
}
@ -300,51 +352,47 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
async fn send_to_endpoint(&self, si: &SI, endpoint: &Endpoint, local_socket: Option<&SI::LocalSocket>, local_interface: Option<&SI::LocalInterface>, packet: &PacketBuffer) -> bool {
match endpoint {
Endpoint::Ip(_) | Endpoint::IpUdp(_) | Endpoint::Ethernet(_) | Endpoint::Bluetooth(_) | Endpoint::WifiDirect(_) => {
let packet_size = packet.len();
if packet_size > UDP_DEFAULT_MTU {
let bytes = packet.as_bytes();
if !si.wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0).await {
return false;
}
/// Send to an endpoint, fragmenting if needed.
///
/// This does not set the fragmentation field in the packet header, MAC, or encrypt the packet. The sender
/// must do that while building the packet. The fragmentation flag must be set if fragmentation will be needed.
async fn internal_send(&self, si: &SI, endpoint: &Endpoint, local_socket: Option<&SI::LocalSocket>, local_interface: Option<&SI::LocalInterface>, max_fragment_size: usize, packet: &PacketBuffer) -> bool {
let packet_size = packet.len();
if packet_size > max_fragment_size {
let bytes = packet.as_bytes();
if !si.wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0).await {
return false;
}
let mut pos = UDP_DEFAULT_MTU;
let mut pos = UDP_DEFAULT_MTU;
let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32;
let fragment_count = (overrun_size / (UDP_DEFAULT_MTU - packet_constants::FRAGMENT_HEADER_SIZE) as u32) + (((overrun_size % (UDP_DEFAULT_MTU - packet_constants::FRAGMENT_HEADER_SIZE) as u32) != 0) as u32);
debug_assert!(fragment_count <= packet_constants::FRAGMENT_COUNT_MAX as u32);
let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32;
let fragment_count = (overrun_size / (UDP_DEFAULT_MTU - packet_constants::FRAGMENT_HEADER_SIZE) as u32) + (((overrun_size % (UDP_DEFAULT_MTU - packet_constants::FRAGMENT_HEADER_SIZE) as u32) != 0) as u32);
debug_assert!(fragment_count <= packet_constants::FRAGMENT_COUNT_MAX as u32);
let mut header = FragmentHeader {
id: *packet.bytes_fixed_at(0).unwrap(),
dest: *packet.bytes_fixed_at(packet_constants::DESTINATION_INDEX).unwrap(),
fragment_indicator: packet_constants::FRAGMENT_INDICATOR,
total_and_fragment_no: ((fragment_count + 1) << 4) as u8,
reserved_hops: 0,
};
let mut header = FragmentHeader {
id: *packet.bytes_fixed_at(0).unwrap(),
dest: *packet.bytes_fixed_at(packet_constants::DESTINATION_INDEX).unwrap(),
fragment_indicator: packet_constants::FRAGMENT_INDICATOR,
total_and_fragment_no: ((fragment_count + 1) << 4) as u8,
reserved_hops: 0,
};
let mut chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - packet_constants::HEADER_SIZE);
loop {
header.total_and_fragment_no += 1;
let next_pos = pos + chunk_size;
if !si.wire_send(endpoint, local_socket, local_interface, &[header.as_bytes(), &bytes[pos..next_pos]], 0).await {
return false;
}
pos = next_pos;
if pos < packet_size {
chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - packet_constants::HEADER_SIZE);
} else {
return true;
}
}
let mut chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - packet_constants::HEADER_SIZE);
loop {
header.total_and_fragment_no += 1;
let next_pos = pos + chunk_size;
if !si.wire_send(endpoint, local_socket, local_interface, &[header.as_bytes(), &bytes[pos..next_pos]], 0).await {
return false;
}
pos = next_pos;
if pos < packet_size {
chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - packet_constants::HEADER_SIZE);
} else {
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await;
return true;
}
}
_ => {
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await;
}
} else {
return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await;
}
}
@ -354,7 +402,7 @@ impl<SI: SystemInterface> Peer<SI> {
/// via a root or some other route.
pub(crate) async fn send(&self, si: &SI, node: &Node<SI>, time_ticks: i64, packet: &PacketBuffer) -> bool {
if let Some(path) = self.path(node) {
if self.send_to_endpoint(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), packet).await {
if self.internal_send(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), if path.endpoint.requires_fragmentation() { UDP_DEFAULT_MTU } else { usize::MAX }, packet).await {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
@ -385,7 +433,8 @@ impl<SI: SystemInterface> Peer<SI> {
/// Otherwise it will be sent via the best direct or indirect path known.
///
/// Unlike other messages HELLO is sent partially in the clear and always with the long-lived
/// static identity key.
/// static identity key. Authentication in old versions is via Poly1305 and in new versions
/// via HMAC-SHA512.
pub(crate) async fn send_hello(&self, si: &SI, node: &Node<SI>, explicit_endpoint: Option<&Endpoint>) -> bool {
let mut path = None;
let destination = if let Some(explicit_endpoint) = explicit_endpoint {
@ -399,7 +448,9 @@ impl<SI: SystemInterface> Peer<SI> {
}
};
let max_fragment_size = if destination.requires_fragmentation() { UDP_DEFAULT_MTU } else { usize::MAX };
let time_ticks = si.time_ticks();
let mut packet = PacketBuffer::new();
{
let message_id = self.next_message_id();
@ -419,46 +470,55 @@ impl<SI: SystemInterface> Peer<SI> {
hello_fixed_headers.version_major = VERSION_MAJOR;
hello_fixed_headers.version_minor = VERSION_MINOR;
hello_fixed_headers.version_revision = (VERSION_REVISION as u16).to_be_bytes();
hello_fixed_headers.timestamp = (time_ticks as u64).to_be_bytes();
hello_fixed_headers.timestamp = si.time_clock().to_be_bytes();
}
// Full identity of the node establishing the session.
assert!(self.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok());
// 8 reserved bytes, must be zero for compatibility with old nodes.
assert!(packet.append_padding(0, 8).is_ok());
// Append two reserved bytes, currently always zero.
assert!(packet.append_padding(0, 2).is_ok());
// Generate a 12-byte nonce for the private section of HELLO.
let mut nonce = get_bytes_secure::<12>();
// LEGACY: create a 16-bit encrypted field that specifies zero "moons." Current nodes ignore this
// and treat it as part of the random AES-CTR nonce, but old versions need it to parse the packet
// correctly.
// Append a 16-byte AES-CTR nonce. LEGACY: for compatibility the last two bytes of this nonce
// are in fact an encryption of two zeroes with Salsa20/12, which old nodes will interpret as
// zero "moons." New nodes will just use these as part of the nonce.
let mut nonce = get_bytes_secure::<16>();
let mut salsa_iv = message_id.to_ne_bytes();
salsa_iv[7] &= 0xf8;
Salsa::<12>::new(&self.identity_symmetric_key.key.0[0..32], &salsa_iv).crypt(&[0_u8, 0_u8], &mut nonce[8..10]);
// Append 12-byte AES-CTR nonce.
Salsa::<12>::new(&self.identity_symmetric_key.key.0[0..32], &salsa_iv).crypt(&crate::util::ZEROES[..2], &mut nonce[14..]);
assert!(packet.append_bytes_fixed(&nonce).is_ok());
// Add session meta-data, which is encrypted using plain AES-CTR. No authentication (AEAD) is needed
// because the whole packet is authenticated. Data in the session is not technically secret in a
// cryptographic sense but we encrypt it for privacy and as a defense in depth.
let mut fields = Dictionary::new();
fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64);
fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
let fields = fields.to_bytes();
assert!(fields.len() <= 0xffff); // sanity check, should be impossible
assert!(packet.append_u16(fields.len() as u16).is_ok()); // prefix with unencrypted size
let private_section_start = packet.len();
assert!(packet.append_bytes(fields.as_slice()).is_ok());
let mut aes = AesCtr::new(&self.identity_symmetric_key.hello_private_section_key.as_bytes()[0..32]);
// because the whole packet is authenticated. While the data in this section is not necessarily critical
// to protect, encrypting it is a defense in depth measure.
let mut session_metadata = Dictionary::new();
session_metadata.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec());
session_metadata.set_u64(session_metadata::TIME_TICKS, time_ticks as u64);
session_metadata.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec());
let session_metadata = session_metadata.to_bytes();
// Prefix encrypted session metadata with its size (in cleartext).
assert!(session_metadata.len() <= 0xffff); // sanity check, should be impossible
assert!(packet.append_u16(session_metadata.len() as u16).is_ok());
// Write session meta-data in encrypted form. A derived key is made using the message ID as a salt
// because this only ever uses the permanent identity key. In V2 we don't want to get near any
// key usage boundaries unless forward secrecy is off for some reason.
nonce[12] &= 0x7f; // mask off the MSB of the 32-bit counter part of the CTR nonce for compatibility with AES libraries that don't wrap
let salted_key = Secret(hmac_sha384(&message_id.to_ne_bytes(), self.identity_symmetric_key.hello_private_section_key.as_bytes()));
let mut aes = AesCtr::new(&salted_key.as_bytes()[0..32]);
aes.init(&nonce);
aes.crypt_in_place(&mut packet.as_mut()[private_section_start..]);
aes.crypt(session_metadata.as_slice(), packet.append_bytes_get_mut(session_metadata.len()).unwrap());
// Set fragment flag if the packet will need to be fragmented.
if (packet.len() + SHA512_HASH_SIZE) > max_fragment_size {
set_packet_fragment_flag(&mut packet);
}
// Seal packet with HMAC-SHA512 extended authentication.
let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes());
hmac.update(&self.identity.fingerprint);
hmac.update(&node.identity.fingerprint);
hmac.update(&message_id.to_ne_bytes());
hmac.update(&packet.as_bytes()[packet_constants::HEADER_SIZE..]);
assert!(packet.append_bytes_fixed(&hmac.finish()).is_ok());
@ -475,20 +535,20 @@ impl<SI: SystemInterface> Peer<SI> {
}
if let Some(p) = path {
if self.send_to_endpoint(si, &destination, Some(&p.local_socket), Some(&p.local_interface), &packet).await {
if self.internal_send(si, &destination, Some(&p.local_socket), Some(&p.local_interface), max_fragment_size, &packet).await {
p.log_send_anything(time_ticks);
true
} else {
false
}
} else {
self.send_to_endpoint(si, &destination, None, None, &packet).await
self.internal_send(si, &destination, None, None, max_fragment_size, &packet).await
}
}
async fn receive_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn handle_incoming_hello(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn receive_error<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
async fn handle_incoming_error<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
let mut cursor: usize = 1;
if let Ok(error_header) = payload.read_struct::<message_component_structs::ErrorHeader>(&mut cursor) {
let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id);
@ -503,12 +563,49 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
async fn receive_ok<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
async fn handle_incoming_ok<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, mut path_is_known: bool, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) {
let mut cursor: usize = 1;
if let Ok(ok_header) = payload.read_struct::<message_component_structs::OkHeader>(&mut cursor) {
let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id);
let current_packet_id_counter = self.message_id_counter.load(Ordering::Relaxed);
if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX {
// TODO: learn new paths
/*
if !path_is_known {
let mut paths = self.paths.lock();
paths.retain_mut(|p| {
if let Some(path) = p.path.upgrade() {
match (&path.endpoint, &source_path.endpoint) {
(Endpoint::IpUdp(current_address), Endpoint::IpUdp(source_address)) => {
if current_address.ip_bytes().eq(source_address.ip_bytes()) {
// In UDP mode, replace paths that come from the same IP but a different port. This helps avoid
// creating endless duplicate paths in NAT traversal scenarios if a NAT changes its port.
p.path = Arc::downgrade(source_path);
p.path_internal_instance_id = source_path.internal_instance_id;
p.last_receive_time_ticks = time_ticks;
path_is_known = true;
true
} else {
true
}
}
_ => true,
}
} else {
false
}
});
if !path_is_known {
paths.push(PeerPath::<SI> {
path: Arc::downgrade(source_path),
path_internal_instance_id: source_path.internal_instance_id,
last_receive_time_ticks: time_ticks,
});
}
Self::prioritize_paths(&mut paths);
}
*/
match ok_header.in_re_verb {
verbs::VL1_HELLO => {
// TODO
@ -522,71 +619,15 @@ impl<SI: SystemInterface> Peer<SI> {
}
}
async fn receive_whois(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn handle_incoming_whois(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn receive_rendezvous(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn handle_incoming_rendezvous(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn receive_echo(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn handle_incoming_echo(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn receive_push_direct_paths(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn handle_incoming_push_direct_paths(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
async fn receive_user_message(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
/// Get current best path or None if there are no direct paths to this peer.
pub fn direct_path(&self) -> Option<Arc<Path<SI>>> {
for p in self.paths.lock().iter() {
let pp = p.path.upgrade();
if pp.is_some() {
return pp;
}
}
return None;
}
/// Get either the current best direct path or an indirect path.
pub fn path(&self, node: &Node<SI>) -> Option<Arc<Path<SI>>> {
let direct_path = self.direct_path();
if direct_path.is_some() {
return direct_path;
}
if let Some(root) = node.root() {
return root.direct_path();
}
return None;
}
/// Get the remote version of this peer: major, minor, revision, and build.
/// Returns None if it's not yet known.
pub fn version(&self) -> Option<[u16; 4]> {
let rv = self.remote_version.load(Ordering::Relaxed);
if rv != 0 {
Some([(rv >> 48) as u16, (rv >> 32) as u16, (rv >> 16) as u16, rv as u16])
} else {
None
}
}
/// Get the remote protocol version of this peer or None if not yet known.
pub fn protocol_version(&self) -> Option<u8> {
let pv = self.remote_protocol_version.load(Ordering::Relaxed);
if pv != 0 {
Some(pv)
} else {
None
}
}
pub(crate) fn service(&self, _: &SI, _: &Node<SI>, time_ticks: i64) -> bool {
let mut paths = self.paths.lock();
if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PEER_EXPIRATION_TIME {
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse());
true
} else {
paths.clear();
false
}
}
async fn handle_incoming_user_message(&self, si: &SI, node: &Node<SI>, time_ticks: i64, source_path: &Arc<Path<SI>>, payload: &PacketBuffer) {}
}
impl<SI: SystemInterface> PartialEq for Peer<SI> {

View file

@ -118,7 +118,7 @@ pub mod packet_constants {
/// Index of destination in both fragment and full packet headers.
pub const DESTINATION_INDEX: usize = 8;
/// Index of 8-byte MAC field in packet header.
/// Index of 8-byte MAC field in packet header (also size of header minus MAC).
pub const MAC_FIELD_INDEX: usize = 19;
/// Mask to select cipher from header flags field.
@ -216,7 +216,7 @@ pub mod security_constants {
pub mod session_metadata {
pub const INSTANCE_ID: &'static str = "i";
pub const CLOCK: &'static str = "t";
pub const TIME_TICKS: &'static str = "t";
pub const SENT_TO: &'static str = "d";
}
@ -280,6 +280,14 @@ pub fn compress_packet<const S: usize>(src: &[u8], dest: &mut Buffer<S>) -> bool
return false;
}
/// Set header flag indicating that a packet is fragmented.
///
/// This will panic if the buffer provided doesn't contain a proper header.
#[inline(always)]
pub fn set_packet_fragment_flag<const S: usize>(pkt: &mut Buffer<S>) {
pkt.as_bytes_mut()[packet_constants::FLAGS_FIELD_INDEX] |= packet_constants::HEADER_FLAG_FRAGMENTED;
}
/// ZeroTier unencrypted outer packet header
///
/// This is the header for a complete packet. If the fragmented flag is set, it will

View file

@ -45,7 +45,7 @@ impl SymmetricSecret {
/// An ephemeral symmetric secret with usage timers and counters.
pub(crate) struct EphemeralSymmetricSecret {
pub secret: SymmetricSecret,
pub decrypt_uses: AtomicUsize,
pub encrypt_uses: AtomicUsize,
}
pub(crate) struct AesGmacSivPoolFactory(Secret<32>, Secret<32>);