Reorg to move around some V1 protocol stuff and label it as such.

This commit is contained in:
Adam Ierymenko 2022-09-29 13:24:36 -04:00
parent b5e1c4f546
commit de506dc48b
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
4 changed files with 147 additions and 142 deletions

View file

@ -47,7 +47,7 @@ impl<DatabaseImpl: Database> Controller<DatabaseImpl> {
} }
impl<DatabaseImpl: Database> PathFilter for Controller<DatabaseImpl> { impl<DatabaseImpl: Database> PathFilter for Controller<DatabaseImpl> {
fn check_path<HostSystemImpl: HostSystem>( fn should_use_physical_path<HostSystemImpl: HostSystem>(
&self, &self,
_id: &Identity, _id: &Identity,
_endpoint: &zerotier_network_hypervisor::vl1::Endpoint, _endpoint: &zerotier_network_hypervisor::vl1::Endpoint,

View file

@ -90,10 +90,10 @@ pub trait NodeStorage: Sync + Send + 'static {
fn save_node_identity(&self, id: &Identity); fn save_node_identity(&self, id: &Identity);
} }
/// Trait to be implemented to provide path hints and a filter to approve physical paths /// Trait to be implemented to provide path hints and a filter to approve physical paths.
pub trait PathFilter: Sync + Send + 'static { pub trait PathFilter: Sync + Send + 'static {
/// Called to check and see if a physical address should be used for ZeroTier traffic to a node. /// Called to check and see if a physical address should be used for ZeroTier traffic to a node.
fn check_path<HostSystemImpl: HostSystem>( fn should_use_physical_path<HostSystemImpl: HostSystem>(
&self, &self,
id: &Identity, id: &Identity,
endpoint: &Endpoint, endpoint: &Endpoint,
@ -190,6 +190,7 @@ struct RootInfo<HostSystemImpl: HostSystem> {
online: bool, online: bool,
} }
/// Interval gate objects used ot fire off background tasks, see do_background_tasks().
#[derive(Default)] #[derive(Default)]
struct BackgroundTaskIntervals { struct BackgroundTaskIntervals {
root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>,
@ -200,8 +201,9 @@ struct BackgroundTaskIntervals {
whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
} }
/// WHOIS requests and any packets that are waiting on them to be decrypted and authenticated.
struct WhoisQueueItem<HostSystemImpl: HostSystem> { struct WhoisQueueItem<HostSystemImpl: HostSystem> {
waiting_packets: RingBuffer<(Weak<Path<HostSystemImpl>>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, v1_proto_waiting_packets: RingBuffer<(Weak<Path<HostSystemImpl>>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>,
last_retry_time: i64, last_retry_time: i64,
retry_count: u16, retry_count: u16,
} }
@ -295,65 +297,6 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
self.roots.read().online self.roots.read().online
} }
fn update_best_root(&self, host_system: &HostSystemImpl, time_ticks: i64) {
let roots = self.roots.read();
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
let mut best = None;
let mut latest_hello_reply = 0;
for (r, _) in roots.roots.iter() {
let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed);
if t > latest_hello_reply {
latest_hello_reply = t;
let _ = best.insert(r);
}
}
if let Some(best) = best {
let mut best_root = self.best_root.write();
if let Some(best_root) = best_root.as_mut() {
if !Arc::ptr_eq(best_root, best) {
debug_event!(
host_system,
"[vl1] new best root: {} (replaced {})",
best.identity.address.to_string(),
best_root.identity.address.to_string()
);
*best_root = best.clone();
}
} else {
debug_event!(
host_system,
"[vl1] new best root: {} (was empty)",
best.identity.address.to_string()
);
let _ = best_root.insert(best.clone());
}
} else {
if let Some(old_best) = self.best_root.write().take() {
debug_event!(
host_system,
"[vl1] new best root: NONE (replaced {})",
old_best.identity.address.to_string()
);
}
}
// Determine if the node is online by whether there is a currently reachable root.
if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() {
if !roots.online {
drop(roots);
self.roots.write().online = true;
host_system.event(Event::Online(true));
}
} else if roots.online {
drop(roots);
self.roots.write().online = false;
host_system.event(Event::Online(false));
}
}
pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration {
const INTERVAL_MS: i64 = 1000; const INTERVAL_MS: i64 = 1000;
const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64);
@ -514,7 +457,65 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
} }
} }
self.update_best_root(host_system, time_ticks); {
let roots = self.roots.read();
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
let mut best = None;
let mut latest_hello_reply = 0;
for (r, _) in roots.roots.iter() {
let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed);
if t > latest_hello_reply {
latest_hello_reply = t;
let _ = best.insert(r);
}
}
if let Some(best) = best {
let best_root = self.best_root.upgradable_read();
if best_root.as_ref().map_or(true, |br| !Arc::ptr_eq(&br, best)) {
let mut best_root = RwLockUpgradableReadGuard::upgrade(best_root);
if let Some(best_root) = best_root.as_mut() {
debug_event!(
host_system,
"[vl1] selected new best root: {} (replaced {})",
best.identity.address.to_string(),
best_root.identity.address.to_string()
);
*best_root = best.clone();
} else {
debug_event!(
host_system,
"[vl1] selected new best root: {} (was empty)",
best.identity.address.to_string()
);
let _ = best_root.insert(best.clone());
}
}
} else {
if let Some(old_best) = self.best_root.write().take() {
debug_event!(
host_system,
"[vl1] selected new best root: NONE (replaced {})",
old_best.identity.address.to_string()
);
}
}
// Determine if the node is online by whether there is a currently reachable root.
if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() {
if !roots.online {
drop(roots);
self.roots.write().online = true;
host_system.event(Event::Online(true));
}
} else if roots.online {
drop(roots);
self.roots.write().online = false;
host_system.event(Event::Online(false));
}
}
} }
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
@ -622,17 +623,22 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
source_local_socket: &HostSystemImpl::LocalSocket, source_local_socket: &HostSystemImpl::LocalSocket,
source_local_interface: &HostSystemImpl::LocalInterface, source_local_interface: &HostSystemImpl::LocalInterface,
time_ticks: i64, time_ticks: i64,
mut data: PooledPacketBuffer, mut packet: PooledPacketBuffer,
) { ) {
debug_event!( debug_event!(
host_system, host_system,
"[vl1] {} -> #{} {}->{} length {} (on socket {}@{})", "[vl1] {} -> #{} {}->{} length {} (on socket {}@{})",
source_endpoint.to_string(), source_endpoint.to_string(),
data.bytes_fixed_at::<8>(0) packet
.bytes_fixed_at::<8>(0)
.map_or("????????????????".into(), |pid| hex::to_string(pid)), .map_or("????????????????".into(), |pid| hex::to_string(pid)),
data.bytes_fixed_at::<5>(13).map_or("??????????".into(), |src| hex::to_string(src)), packet
data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| hex::to_string(dest)), .bytes_fixed_at::<5>(13)
data.len(), .map_or("??????????".into(), |src| hex::to_string(src)),
packet
.bytes_fixed_at::<5>(8)
.map_or("??????????".into(), |dest| hex::to_string(dest)),
packet.len(),
source_local_socket.to_string(), source_local_socket.to_string(),
source_local_interface.to_string() source_local_interface.to_string()
); );
@ -642,12 +648,12 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
// and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP // and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP
// is the new V2 Noise-based forward-secure transport protocol. What follows below this // is the new V2 Noise-based forward-secure transport protocol. What follows below this
// is legacy handling of the old v1 protocol. // is legacy handling of the old v1 protocol.
if data.u8_at(8).map_or(false, |x| x == 0xff) { if packet.u8_at(8).map_or(false, |x| x == 0xff) {
todo!(); todo!();
} }
// Legacy ZeroTier V1 packet handling // Legacy ZeroTier V1 packet handling
if let Ok(fragment_header) = data.struct_mut_at::<v1::FragmentHeader>(0) { if let Ok(fragment_header) = packet.struct_mut_at::<v1::FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
// Packet is addressed to this node. // Packet is addressed to this node.
@ -671,7 +677,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
fragment_header.packet_id(), fragment_header.packet_id(),
fragment_header.fragment_no(), fragment_header.fragment_no(),
fragment_header.total_fragments(), fragment_header.total_fragments(),
data, packet,
time_ticks, time_ticks,
) { ) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() { if let Some(frag0) = assembled_packet.frags[0].as_ref() {
@ -681,7 +687,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Ok(packet_header) = frag0.struct_at::<v1::PacketHeader>(0) { if let Ok(packet_header) = frag0.struct_at::<v1::PacketHeader>(0) {
if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) { if let Some(peer) = self.peer(source) {
peer.receive( peer.v1_proto_receive(
self, self,
host_system, host_system,
inner, inner,
@ -708,11 +714,11 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks);
} }
} }
} } // else source address invalid
} } // else header incomplete
} } // else reassembly failed (in a way that shouldn't be possible)
} } // else packet not fully assembled yet
} else if let Ok(packet_header) = data.struct_at::<v1::PacketHeader>(0) { } else if let Ok(packet_header) = packet.struct_at::<v1::PacketHeader>(0) {
debug_event!( debug_event!(
host_system, host_system,
"[vl1] [v1] #{:0>16x} is unfragmented", "[vl1] [v1] #{:0>16x} is unfragmented",
@ -721,18 +727,19 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) { if let Some(peer) = self.peer(source) {
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]); peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]);
} else { } else {
self.whois(host_system, source, Some((Arc::downgrade(&path), data)), time_ticks); self.whois(host_system, source, Some((Arc::downgrade(&path), packet)), time_ticks);
} }
} }
} } // else not fragment and header incomplete
} else { } else if self.this_node_is_root() {
// Packet is addressed somewhere else. // Packet is addressed somewhere else, forward if this node is a root.
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
let debug_packet_id; let debug_packet_id;
// Increment and check hop count in packet header, return if max hops exceeded or error.
if fragment_header.is_fragment() { if fragment_header.is_fragment() {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
@ -749,7 +756,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
return; return;
} }
} else if let Ok(packet_header) = data.struct_mut_at::<v1::PacketHeader>(0) { } else if let Ok(packet_header) = packet.struct_mut_at::<v1::PacketHeader>(0) {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
debug_packet_id = u64::from_be_bytes(packet_header.id); debug_packet_id = u64::from_be_bytes(packet_header.id);
@ -774,12 +781,22 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
} }
if let Some(peer) = self.peer(dest) { if let Some(peer) = self.peer(dest) {
// TODO: SHOULD we forward? Need a way to check. if let Some(forward_path) = peer.direct_path() {
peer.forward(host_system, time_ticks, data.as_ref()); host_system.wire_send(
#[cfg(debug_assertions)] &forward_path.endpoint,
debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); Some(&forward_path.local_socket),
Some(&forward_path.local_interface),
packet.as_bytes(),
0,
);
peer.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
#[cfg(debug_assertions)]
debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id);
}
} }
} } // else not for this node and shouldn't be forwarded
} }
} }
} }
@ -796,12 +813,12 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
{ {
let mut whois_queue = self.whois_queue.lock(); let mut whois_queue = self.whois_queue.lock();
let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem::<HostSystemImpl> { let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem::<HostSystemImpl> {
waiting_packets: RingBuffer::new(), v1_proto_waiting_packets: RingBuffer::new(),
last_retry_time: 0, last_retry_time: 0,
retry_count: 0, retry_count: 0,
}); });
if let Some(p) = waiting_packet { if let Some(p) = waiting_packet {
qi.waiting_packets.add(p); qi.v1_proto_waiting_packets.add(p);
} }
if qi.retry_count > 0 { if qi.retry_count > 0 {
return; return;
@ -859,10 +876,10 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
.and_then(|peer| Some(peers.entry(address).or_insert(peer).clone())) .and_then(|peer| Some(peers.entry(address).or_insert(peer).clone()))
}) { }) {
drop(peers); drop(peers);
for p in qi.waiting_packets.iter() { for p in qi.v1_proto_waiting_packets.iter() {
if let Some(path) = p.0.upgrade() { if let Some(path) = p.0.upgrade() {
if let Ok(packet_header) = p.1.struct_at::<v1::PacketHeader>(0) { if let Ok(packet_header) = p.1.struct_at::<v1::PacketHeader>(0) {
peer.receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]); peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]);
} }
} }
} }
@ -1070,7 +1087,7 @@ pub struct DummyPathFilter;
impl PathFilter for DummyPathFilter { impl PathFilter for DummyPathFilter {
#[inline(always)] #[inline(always)]
fn check_path<HostSystemImpl: HostSystem>( fn should_use_physical_path<HostSystemImpl: HostSystem>(
&self, &self,
_id: &Identity, _id: &Identity,
_endpoint: &Endpoint, _endpoint: &Endpoint,

View file

@ -106,12 +106,6 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
} }
} }
/// Get the next message ID for sending a message to this peer.
#[inline(always)]
pub(crate) fn next_message_id(&self) -> MessageId {
self.message_id_counter.fetch_add(1, Ordering::SeqCst)
}
/// Get current best path or None if there are no direct paths to this peer. /// Get current best path or None if there are no direct paths to this peer.
pub fn direct_path(&self) -> Option<Arc<Path<HostSystemImpl>>> { pub fn direct_path(&self) -> Option<Arc<Path<HostSystemImpl>>> {
for p in self.paths.lock().iter() { for p in self.paths.lock().iter() {
@ -192,13 +186,24 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
prioritize_paths(&mut paths); prioritize_paths(&mut paths);
} }
#[inline(always)]
pub(crate) fn v1_proto_next_message_id(&self) -> MessageId {
self.message_id_counter.fetch_add(1, Ordering::SeqCst)
}
/// Called every SERVICE_INTERVAL_MS by the background service loop in Node. /// Called every SERVICE_INTERVAL_MS by the background service loop in Node.
pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node<HostSystemImpl>, time_ticks: i64) -> bool { pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node<HostSystemImpl>, time_ticks: i64) -> bool {
// Prune dead paths and sort in descending order of quality.
{ {
let mut paths = self.paths.lock(); let mut paths = self.paths.lock();
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
if paths.capacity() > 16 {
paths.shrink_to_fit();
}
prioritize_paths(&mut paths); prioritize_paths(&mut paths);
} }
// Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs).
self.remote_node_info self.remote_node_info
.write() .write()
.reported_local_endpoints .reported_local_endpoints
@ -206,7 +211,8 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
(time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME
} }
fn internal_send( /// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed.
fn v1_proto_internal_send(
&self, &self,
host_system: &HostSystemImpl, host_system: &HostSystemImpl,
endpoint: &Endpoint, endpoint: &Endpoint,
@ -280,7 +286,8 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
}; };
let max_fragment_size = path.endpoint.max_fragment_size(); let max_fragment_size = path.endpoint.max_fragment_size();
if self.remote_node_info.read().remote_protocol_version >= 12 {
if self.remote_node_info.read().remote_protocol_version >= 12 || self.identity.p384.is_some() {
let flags_cipher_hops = if packet.len() > max_fragment_size { let flags_cipher_hops = if packet.len() > max_fragment_size {
v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV
} else { } else {
@ -288,7 +295,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
}; };
let mut aes_gmac_siv = self.static_symmetric_key.aes_gmac_siv.get(); let mut aes_gmac_siv = self.static_symmetric_key.aes_gmac_siv.get();
aes_gmac_siv.encrypt_init(&self.next_message_id().to_ne_bytes()); aes_gmac_siv.encrypt_init(&self.v1_proto_next_message_id().to_ne_bytes());
aes_gmac_siv.encrypt_set_aad(&v1::get_packet_aad_bytes( aes_gmac_siv.encrypt_set_aad(&v1::get_packet_aad_bytes(
self.identity.address, self.identity.address,
node.identity.address, node.identity.address,
@ -317,11 +324,11 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
v1::CIPHER_SALSA2012_POLY1305 v1::CIPHER_SALSA2012_POLY1305
}; };
let (mut salsa, poly1305_otk) = salsa_poly_create( let (mut salsa, poly1305_otk) = v1_proto_salsa_poly_create(
&self.static_symmetric_key, &self.static_symmetric_key,
{ {
let header = packet.struct_mut_at::<v1::PacketHeader>(0).unwrap(); let header = packet.struct_mut_at::<v1::PacketHeader>(0).unwrap();
header.id = self.next_message_id().to_ne_bytes(); header.id = self.v1_proto_next_message_id().to_ne_bytes();
header.dest = self.identity.address.to_bytes(); header.dest = self.identity.address.to_bytes();
header.src = node.identity.address.to_bytes(); header.src = node.identity.address.to_bytes();
header.flags_cipher_hops = flags_cipher_hops; header.flags_cipher_hops = flags_cipher_hops;
@ -339,7 +346,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
packet.as_bytes_mut()[v1::MAC_FIELD_INDEX..(v1::MAC_FIELD_INDEX + 8)].copy_from_slice(&tag[0..8]); packet.as_bytes_mut()[v1::MAC_FIELD_INDEX..(v1::MAC_FIELD_INDEX + 8)].copy_from_slice(&tag[0..8]);
} }
self.internal_send( self.v1_proto_internal_send(
host_system, host_system,
&path.endpoint, &path.endpoint,
Some(&path.local_socket), Some(&path.local_socket),
@ -353,28 +360,6 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
return true; return true;
} }
/// Forward a packet to this peer.
///
/// This is called when we receive a packet not addressed to this node and
/// want to pass it along.
///
/// This doesn't fragment large packets since fragments are forwarded individually.
/// Intermediates don't need to adjust fragmentation.
pub(crate) fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool {
if let Some(path) = self.direct_path() {
host_system.wire_send(
&path.endpoint,
Some(&path.local_socket),
Some(&path.local_interface),
packet.as_bytes(),
0,
);
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
return true;
}
return false;
}
/// Send a HELLO to this peer. /// Send a HELLO to this peer.
/// ///
/// If explicit_endpoint is not None the packet will be sent directly to this endpoint. /// If explicit_endpoint is not None the packet will be sent directly to this endpoint.
@ -406,7 +391,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
let mut packet = PacketBuffer::new(); let mut packet = PacketBuffer::new();
{ {
let message_id = self.next_message_id(); let message_id = self.v1_proto_next_message_id();
{ {
let f: &mut (v1::PacketHeader, v1::message_component_structs::HelloFixedHeaderFields) = let f: &mut (v1::PacketHeader, v1::message_component_structs::HelloFixedHeaderFields) =
@ -426,7 +411,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
debug_assert_eq!(packet.len(), 41); debug_assert_eq!(packet.len(), 41);
assert!(node.identity.write_public(&mut packet, self.identity.p384.is_none()).is_ok()); assert!(node.identity.write_public(&mut packet, self.identity.p384.is_none()).is_ok());
let (_, poly1305_key) = salsa_poly_create( let (_, poly1305_key) = v1_proto_salsa_poly_create(
&self.static_symmetric_key, &self.static_symmetric_key,
packet.struct_at::<v1::PacketHeader>(0).unwrap(), packet.struct_at::<v1::PacketHeader>(0).unwrap(),
packet.len(), packet.len(),
@ -446,7 +431,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
} }
if let Some(p) = path.as_ref() { if let Some(p) = path.as_ref() {
self.internal_send( self.v1_proto_internal_send(
host_system, host_system,
destination, destination,
Some(&p.local_socket), Some(&p.local_socket),
@ -456,7 +441,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
); );
p.log_send_anything(time_ticks); p.log_send_anything(time_ticks);
} else { } else {
self.internal_send(host_system, destination, None, None, max_fragment_size, &packet); self.v1_proto_internal_send(host_system, destination, None, None, max_fragment_size, &packet);
} }
return true; return true;
@ -468,7 +453,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// those fragments after the main packet header and first chunk. /// those fragments after the main packet header and first chunk.
/// ///
/// This returns true if the packet decrypted and passed authentication. /// This returns true if the packet decrypted and passed authentication.
pub(crate) fn receive<InnerProtocolImpl: InnerProtocol>( pub(crate) fn v1_proto_receive<InnerProtocolImpl: InnerProtocol>(
self: &Arc<Self>, self: &Arc<Self>,
node: &Node<HostSystemImpl>, node: &Node<HostSystemImpl>,
host_system: &HostSystemImpl, host_system: &HostSystemImpl,
@ -482,7 +467,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) { if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) {
let mut payload = PacketBuffer::new(); let mut payload = PacketBuffer::new();
let message_id = if let Some(message_id2) = try_aead_decrypt( let message_id = if let Some(message_id2) = v1_proto_try_aead_decrypt(
&self.static_symmetric_key, &self.static_symmetric_key,
packet_frag0_payload_bytes, packet_frag0_payload_bytes,
packet_header, packet_header,
@ -584,7 +569,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
time_ticks: i64, time_ticks: i64,
message_id: MessageId, message_id: MessageId,
source_path: &Arc<Path<HostSystemImpl>>, source_path: &Arc<Path<HostSystemImpl>>,
hops: u8, _hops: u8,
payload: &PacketBuffer, payload: &PacketBuffer,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) {
@ -737,7 +722,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
self.identity.address.to_string(), self.identity.address.to_string(),
r.err().unwrap().to_string() r.err().unwrap().to_string()
); );
break; return PacketHandlerResult::Error;
} }
} }
} else { } else {
@ -884,8 +869,7 @@ impl<HostSystemImpl: HostSystem> PartialEq for Peer<HostSystemImpl> {
impl<HostSystemImpl: HostSystem> Eq for Peer<HostSystemImpl> {} impl<HostSystemImpl: HostSystem> Eq for Peer<HostSystemImpl> {}
/// Attempt ZeroTier V1 protocol AEAD packet encryption and MAC validation. Returns message ID on success. fn v1_proto_try_aead_decrypt(
fn try_aead_decrypt(
secret: &SymmetricSecret, secret: &SymmetricSecret,
packet_frag0_payload_bytes: &[u8], packet_frag0_payload_bytes: &[u8],
packet_header: &v1::PacketHeader, packet_header: &v1::PacketHeader,
@ -904,7 +888,7 @@ fn try_aead_decrypt(
} }
} }
let (mut salsa, poly1305_key) = salsa_poly_create(secret, packet_header, payload.len() + v1::HEADER_SIZE); let (mut salsa, poly1305_key) = v1_proto_salsa_poly_create(secret, packet_header, payload.len() + v1::HEADER_SIZE);
let mac = poly1305::compute(&poly1305_key, &payload.as_bytes()); let mac = poly1305::compute(&poly1305_key, &payload.as_bytes());
if mac[0..8].eq(&packet_header.mac) { if mac[0..8].eq(&packet_header.mac) {
let message_id = u64::from_ne_bytes(packet_header.id); let message_id = u64::from_ne_bytes(packet_header.id);
@ -968,9 +952,7 @@ fn try_aead_decrypt(
} }
} }
/// Create initialized instances of Salsa20/12 and Poly1305 for a packet. fn v1_proto_salsa_poly_create(secret: &SymmetricSecret, header: &v1::PacketHeader, packet_size: usize) -> (Salsa<12>, [u8; 32]) {
/// (Note that this is a legacy cipher suite.)
fn salsa_poly_create(secret: &SymmetricSecret, header: &v1::PacketHeader, packet_size: usize) -> (Salsa<12>, [u8; 32]) {
// Create a per-packet key from the IV, source, destination, and packet size. // Create a per-packet key from the IV, source, destination, and packet size.
let mut key: Secret<32> = secret.key.first_n_clone(); let mut key: Secret<32> = secret.key.first_n_clone();
let hb = header.as_bytes(); let hb = header.as_bytes();

View file

@ -220,8 +220,10 @@ impl RootSet {
fn marshal_internal<const BL: usize>(&self, buf: &mut Buffer<BL>, include_signatures: bool) -> Result<(), UnmarshalError> { fn marshal_internal<const BL: usize>(&self, buf: &mut Buffer<BL>, include_signatures: bool) -> Result<(), UnmarshalError> {
buf.append_u8(0)?; // version byte for future use buf.append_u8(0)?; // version byte for future use
buf.append_varint(self.name.as_bytes().len() as u64)?; buf.append_varint(self.name.as_bytes().len() as u64)?;
buf.append_bytes(self.name.as_bytes())?; buf.append_bytes(self.name.as_bytes())?;
if self.url.is_some() { if self.url.is_some() {
let url = self.url.as_ref().unwrap().as_bytes(); let url = self.url.as_ref().unwrap().as_bytes();
buf.append_varint(url.len() as u64)?; buf.append_varint(url.len() as u64)?;
@ -229,7 +231,9 @@ impl RootSet {
} else { } else {
buf.append_varint(0)?; buf.append_varint(0)?;
} }
buf.append_varint(self.revision)?; buf.append_varint(self.revision)?;
buf.append_varint(self.members.len() as u64)?; buf.append_varint(self.members.len() as u64)?;
for m in self.members.iter() { for m in self.members.iter() {
m.identity.marshal(buf)?; m.identity.marshal(buf)?;
@ -251,7 +255,9 @@ impl RootSet {
buf.append_u8(m.protocol_version)?; buf.append_u8(m.protocol_version)?;
buf.append_varint(0)?; // size of additional fields for future use buf.append_varint(0)?; // size of additional fields for future use
} }
buf.append_varint(0)?; // size of additional fields for future use buf.append_varint(0)?; // size of additional fields for future use
Ok(()) Ok(())
} }
} }